Passed
Push — master ( c4d3df...2baa92 )
by Vinicius
02:42 queued 16s
created

build.main.Main._is_duplicated_evc()   A

Complexity

Conditions 5

Size

Total Lines 15
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 6
nop 2
dl 0
loc 15
ccs 5
cts 5
cp 1
crap 5
rs 9.3333
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26
                                          Path)
27 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31
32
33
# pylint: disable=too-many-public-methods
34 1
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37
    This class is the entry point for this napp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52
53
        # object to save and load circuits
54 1
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58 1
        DynamicPathManager.set_controller(self.controller)
59
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63
64 1
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69 1
        self._topology_updated_at = None
70
71 1
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77
                      key=lambda x: (-x.service_level, x.creation_time))
78
79 1
    @staticmethod
80 1
    def get_eline_controller():
81
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83
84 1
    def execute(self):
85
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89 1
        with self._lock:
90 1
            self.execute_consistency()
91 1
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96 1
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109
110 1
    def execute_consistency(self):
111
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124 1
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133 1
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135 1
            self._load_evc(stored_circuits[circuit_id])
136
137 1
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140
        If you have some cleanup procedure, insert it here.
141
        """
142
143 1
    @rest("/v2/evc/", methods=["GET"])
144 1
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147
        archive query arg if defined (not null) will be filtered
148
        accordingly, by default only non archived evcs will be listed
149
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152 1
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159 1
    @rest("/v2/evc/schedule", methods=["GET"])
160 1
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165
         "circuit_id": <circuit_id>,
166
         "schedule": <schedule object>}]
167
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171 1
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179 1
            if circuit_scheduler:
180 1
                for scheduler in circuit_scheduler:
181 1
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201 1
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204
205
    # pylint: disable=too-many-branches, too-many-statements
206 1
    @rest("/v2/evc/", methods=["POST"])
207 1
    @validate_openapi(spec)
208 1
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229
230
        Finnaly, notify user of the status of its request.
231
        """
232
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235
236 1
        try:
237 1
            evc = self._evc_from_dict(data)
238 1
        except (ValueError, KytosTagError) as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        try:
242 1
            check_disabled_component(evc.uni_a, evc.uni_z)
243 1
        except DisabledSwitch as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 409)
245 1
            raise HTTPException(
246
                    409,
247
                    detail=f"Path is not valid: {exception}"
248
                ) from exception
249
250 1
        if evc.primary_path:
251 1
            try:
252 1
                evc.primary_path.is_valid(
253
                    evc.uni_a.interface.switch,
254
                    evc.uni_z.interface.switch,
255
                    bool(evc.circuit_scheduler),
256
                )
257 1
            except InvalidPath as exception:
258 1
                raise HTTPException(
259
                    400,
260
                    detail=f"primary_path is not valid: {exception}"
261
                ) from exception
262 1
        if evc.backup_path:
263 1
            try:
264 1
                evc.backup_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"backup_path is not valid: {exception}"
273
                ) from exception
274
275 1
        if not evc._tag_lists_equal():
276 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
277 1
            raise HTTPException(400, detail=detail)
278
279 1
        try:
280 1
            evc._validate_has_primary_or_dynamic()
281 1
        except ValueError as exception:
282 1
            raise HTTPException(400, detail=str(exception)) from exception
283
284 1
        try:
285 1
            self._use_uni_tags(evc)
286 1
        except KytosTagError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289
        # save circuit
290 1
        try:
291 1
            evc.sync()
292
        except ValidationError as exception:
293
            raise HTTPException(400, detail=str(exception)) from exception
294
295
        # store circuit in dictionary
296 1
        self.circuits[evc.id] = evc
297
298
        # Schedule the circuit deploy
299 1
        self.sched.add(evc)
300
301
        # Circuit has no schedule, deploy now
302 1
        if not evc.circuit_scheduler:
303 1
            with evc.lock:
304 1
                evc.deploy()
305
306
        # Notify users
307 1
        result = {"circuit_id": evc.id}
308 1
        status = 201
309 1
        log.debug("create_circuit result %s %s", result, status)
310 1
        emit_event(self.controller, name="created",
311
                   content=map_evc_event_content(evc))
312 1
        return JSONResponse(result, status_code=status)
313
314 1
    @staticmethod
315 1
    def _use_uni_tags(evc):
316 1
        uni_a = evc.uni_a
317 1
        evc._use_uni_vlan(uni_a)
318 1
        try:
319 1
            uni_z = evc.uni_z
320 1
            evc._use_uni_vlan(uni_z)
321 1
        except KytosTagError as err:
322 1
            evc.make_uni_vlan_available(uni_a)
323 1
            raise err
324
325 1
    @listen_to('kytos/flow_manager.flow.removed')
326 1
    def on_flow_delete(self, event):
327
        """Capture delete messages to keep track when flows got removed."""
328
        self.handle_flow_delete(event)
329
330 1
    def handle_flow_delete(self, event):
331
        """Keep track when the EVC got flows removed by deriving its cookie."""
332 1
        flow = event.content["flow"]
333 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
334 1
        if evc:
335 1
            log.debug("Flow removed in EVC %s", evc.id)
336 1
            evc.set_flow_removed_at()
337
338 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
339 1
    @validate_openapi(spec)
340 1
    def update(self, request: Request) -> JSONResponse:
341
        """Update a circuit based on payload.
342
343
        The EVC attributes (creation_time, active, current_path,
344
        failover_path, _id, archived) can't be updated.
345
        """
346 1
        data = get_json_or_400(request, self.controller.loop)
347 1
        circuit_id = request.path_params["circuit_id"]
348 1
        log.debug("update /v2/evc/%s", circuit_id)
349 1
        try:
350 1
            evc = self.circuits[circuit_id]
351 1
        except KeyError:
352 1
            result = f"circuit_id {circuit_id} not found"
353 1
            log.debug("update result %s %s", result, 404)
354 1
            raise HTTPException(404, detail=result) from KeyError
355
356 1
        if evc.archived:
357 1
            result = "Can't update archived EVC"
358 1
            log.debug("update result %s %s", result, 409)
359 1
            raise HTTPException(409, detail=result)
360
361 1
        try:
362 1
            enable, redeploy = evc.update(
363
                **self._evc_dict_with_instances(data)
364
            )
365 1
        except (ValueError, KytosTagError, ValidationError) as exception:
366 1
            log.debug("update result %s %s", exception, 400)
367 1
            raise HTTPException(400, detail=str(exception)) from exception
368 1
        except DisabledSwitch as exception:
369 1
            log.debug("update result %s %s", exception, 409)
370 1
            raise HTTPException(
371
                    409,
372
                    detail=f"Path is not valid: {exception}"
373
                ) from exception
374
375 1
        if evc.is_active():
376
            if enable is False:  # disable if active
377
                with evc.lock:
378
                    evc.remove()
379
            elif redeploy is not None:  # redeploy if active
380
                with evc.lock:
381
                    evc.remove()
382
                    evc.deploy()
383
        else:
384 1
            if enable is True:  # enable if inactive
385 1
                with evc.lock:
386 1
                    evc.deploy()
387 1
        result = {evc.id: evc.as_dict()}
388 1
        status = 200
389
390 1
        log.debug("update result %s %s", result, status)
391 1
        emit_event(self.controller, "updated",
392
                   content=map_evc_event_content(evc, **data))
393 1
        return JSONResponse(result, status_code=status)
394
395 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
396 1
    def delete_circuit(self, request: Request) -> JSONResponse:
397
        """Remove a circuit.
398
399
        First, the flows are removed from the switches, and then the EVC is
400
        disabled.
401
        """
402 1
        circuit_id = request.path_params["circuit_id"]
403 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
404 1
        try:
405 1
            evc = self.circuits[circuit_id]
406 1
        except KeyError:
407 1
            result = f"circuit_id {circuit_id} not found"
408 1
            log.debug("delete_circuit result %s %s", result, 404)
409 1
            raise HTTPException(404, detail=result) from KeyError
410
411 1
        if evc.archived:
412 1
            result = f"Circuit {circuit_id} already removed"
413 1
            log.debug("delete_circuit result %s %s", result, 404)
414 1
            raise HTTPException(404, detail=result)
415
416 1
        log.info("Removing %s", evc)
417 1
        with evc.lock:
418 1
            evc.remove_current_flows()
419 1
            evc.remove_failover_flows(sync=False)
420 1
            evc.deactivate()
421 1
            evc.disable()
422 1
            self.sched.remove(evc)
423 1
            evc.archive()
424 1
            evc.remove_uni_tags()
425 1
            evc.sync()
426 1
        log.info("EVC removed. %s", evc)
427 1
        result = {"response": f"Circuit {circuit_id} removed"}
428 1
        status = 200
429
430 1
        log.debug("delete_circuit result %s %s", result, status)
431 1
        emit_event(self.controller, "deleted",
432
                   content=map_evc_event_content(evc))
433 1
        return JSONResponse(result, status_code=status)
434
435 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
436 1
    def get_metadata(self, request: Request) -> JSONResponse:
437
        """Get metadata from an EVC."""
438 1
        circuit_id = request.path_params["circuit_id"]
439 1
        try:
440 1
            return (
441
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
442
            )
443
        except KeyError as error:
444
            raise HTTPException(
445
                404,
446
                detail=f"circuit_id {circuit_id} not found."
447
            ) from error
448
449 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
450 1
    @validate_openapi(spec)
451 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
452
        """Add metadata to a bulk of EVCs."""
453 1
        data = get_json_or_400(request, self.controller.loop)
454 1
        circuit_ids = data.pop("circuit_ids")
455
456 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
457
458 1
        fail_evcs = []
459 1
        for _id in circuit_ids:
460 1
            try:
461 1
                evc = self.circuits[_id]
462 1
                evc.extend_metadata(data)
463 1
            except KeyError:
464 1
                fail_evcs.append(_id)
465
466 1
        if fail_evcs:
467 1
            raise HTTPException(404, detail=fail_evcs)
468 1
        return JSONResponse("Operation successful", status_code=201)
469
470 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
471 1
    @validate_openapi(spec)
472 1
    def add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to an EVC."""
474 1
        circuit_id = request.path_params["circuit_id"]
475 1
        metadata = get_json_or_400(request, self.controller.loop)
476 1
        if not isinstance(metadata, dict):
477
            raise HTTPException(400, "Invalid metadata value: {metadata}")
478 1
        try:
479 1
            evc = self.circuits[circuit_id]
480 1
        except KeyError as error:
481 1
            raise HTTPException(
482
                404,
483
                detail=f"circuit_id {circuit_id} not found."
484
            ) from error
485
486 1
        evc.extend_metadata(metadata)
487 1
        evc.sync()
488 1
        return JSONResponse("Operation successful", status_code=201)
489
490 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491 1
    @validate_openapi(spec)
492 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
493
        """Delete metada from a bulk of EVCs"""
494 1
        data = get_json_or_400(request, self.controller.loop)
495 1
        key = request.path_params["key"]
496 1
        circuit_ids = data.pop("circuit_ids")
497 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
498
499 1
        fail_evcs = []
500 1
        for _id in circuit_ids:
501 1
            try:
502 1
                evc = self.circuits[_id]
503 1
                evc.remove_metadata(key)
504 1
            except KeyError:
505 1
                fail_evcs.append(_id)
506
507 1
        if fail_evcs:
508 1
            raise HTTPException(404, detail=fail_evcs)
509 1
        return JSONResponse("Operation successful")
510
511 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
512 1
    def delete_metadata(self, request: Request) -> JSONResponse:
513
        """Delete metadata from an EVC."""
514 1
        circuit_id = request.path_params["circuit_id"]
515 1
        key = request.path_params["key"]
516 1
        try:
517 1
            evc = self.circuits[circuit_id]
518 1
        except KeyError as error:
519 1
            raise HTTPException(
520
                404,
521
                detail=f"circuit_id {circuit_id} not found."
522
            ) from error
523
524 1
        evc.remove_metadata(key)
525 1
        evc.sync()
526 1
        return JSONResponse("Operation successful")
527
528 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
529 1
    def redeploy(self, request: Request) -> JSONResponse:
530
        """Endpoint to force the redeployment of an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
533 1
        try:
534 1
            evc = self.circuits[circuit_id]
535 1
        except KeyError:
536 1
            raise HTTPException(
537
                404,
538
                detail=f"circuit_id {circuit_id} not found"
539
            ) from KeyError
540 1
        if evc.is_enabled():
541 1
            with evc.lock:
542 1
                evc.remove_current_flows()
543 1
                evc.deploy()
544 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
545 1
            status = 202
546
        else:
547 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
548 1
            status = 409
549
550 1
        return JSONResponse(result, status_code=status)
551
552 1
    @rest("/v2/evc/schedule/", methods=["POST"])
553 1
    @validate_openapi(spec)
554 1
    def create_schedule(self, request: Request) -> JSONResponse:
555
        """
556
        Create a new schedule for a given circuit.
557
558
        This service do no check if there are conflicts with another schedule.
559
        Payload example:
560
            {
561
              "circuit_id":"aa:bb:cc",
562
              "schedule": {
563
                "date": "2019-08-07T14:52:10.967Z",
564
                "interval": "string",
565
                "frequency": "1 * * * *",
566
                "action": "create"
567
              }
568
            }
569
        """
570 1
        log.debug("create_schedule /v2/evc/schedule/")
571 1
        data = get_json_or_400(request, self.controller.loop)
572 1
        circuit_id = data["circuit_id"]
573 1
        schedule_data = data["schedule"]
574
575
        # Get EVC from circuits buffer
576 1
        circuits = self._get_circuits_buffer()
577
578
        # get the circuit
579 1
        evc = circuits.get(circuit_id)
580
581
        # get the circuit
582 1
        if not evc:
583 1
            result = f"circuit_id {circuit_id} not found"
584 1
            log.debug("create_schedule result %s %s", result, 404)
585 1
            raise HTTPException(404, detail=result)
586
        # Can not modify circuits deleted and archived
587 1
        if evc.archived:
588 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
589 1
            log.debug("create_schedule result %s %s", result, 409)
590 1
            raise HTTPException(409, detail=result)
591
592
        # new schedule from dict
593 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
594
595
        # If there is no schedule, create the list
596 1
        if not evc.circuit_scheduler:
597 1
            evc.circuit_scheduler = []
598
599
        # Add the new schedule
600 1
        evc.circuit_scheduler.append(new_schedule)
601
602
        # Add schedule job
603 1
        self.sched.add_circuit_job(evc, new_schedule)
604
605
        # save circuit to mongodb
606 1
        evc.sync()
607
608 1
        result = new_schedule.as_dict()
609 1
        status = 201
610
611 1
        log.debug("create_schedule result %s %s", result, status)
612 1
        return JSONResponse(result, status_code=status)
613
614 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
615 1
    @validate_openapi(spec)
616 1
    def update_schedule(self, request: Request) -> JSONResponse:
617
        """Update a schedule.
618
619
        Change all attributes from the given schedule from a EVC circuit.
620
        The schedule ID is preserved as default.
621
        Payload example:
622
            {
623
              "date": "2019-08-07T14:52:10.967Z",
624
              "interval": "string",
625
              "frequency": "1 * * *",
626
              "action": "create"
627
            }
628
        """
629 1
        data = get_json_or_400(request, self.controller.loop)
630 1
        schedule_id = request.path_params["schedule_id"]
631 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
632
633
        # Try to find a circuit schedule
634 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
635
636
        # Can not modify circuits deleted and archived
637 1
        if not found_schedule:
638 1
            result = f"schedule_id {schedule_id} not found"
639 1
            log.debug("update_schedule result %s %s", result, 404)
640 1
            raise HTTPException(404, detail=result)
641 1
        if evc.archived:
642 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
643 1
            log.debug("update_schedule result %s %s", result, 409)
644 1
            raise HTTPException(409, detail=result)
645
646 1
        new_schedule = CircuitSchedule.from_dict(data)
647 1
        new_schedule.id = found_schedule.id
648
        # Remove the old schedule
649 1
        evc.circuit_scheduler.remove(found_schedule)
650
        # Append the modified schedule
651 1
        evc.circuit_scheduler.append(new_schedule)
652
653
        # Cancel all schedule jobs
654 1
        self.sched.cancel_job(found_schedule.id)
655
        # Add the new circuit schedule
656 1
        self.sched.add_circuit_job(evc, new_schedule)
657
        # Save EVC to mongodb
658 1
        evc.sync()
659
660 1
        result = new_schedule.as_dict()
661 1
        status = 200
662
663 1
        log.debug("update_schedule result %s %s", result, status)
664 1
        return JSONResponse(result, status_code=status)
665
666 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
667 1
    def delete_schedule(self, request: Request) -> JSONResponse:
668
        """Remove a circuit schedule.
669
670
        Remove the Schedule from EVC.
671
        Remove the Schedule from cron job.
672
        Save the EVC to the Storehouse.
673
        """
674 1
        schedule_id = request.path_params["schedule_id"]
675 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
676 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
677
678
        # Can not modify circuits deleted and archived
679 1
        if not found_schedule:
680 1
            result = f"schedule_id {schedule_id} not found"
681 1
            log.debug("delete_schedule result %s %s", result, 404)
682 1
            raise HTTPException(404, detail=result)
683
684 1
        if evc.archived:
685 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
686 1
            log.debug("delete_schedule result %s %s", result, 409)
687 1
            raise HTTPException(409, detail=result)
688
689
        # Remove the old schedule
690 1
        evc.circuit_scheduler.remove(found_schedule)
691
692
        # Cancel all schedule jobs
693 1
        self.sched.cancel_job(found_schedule.id)
694
        # Save EVC to mongodb
695 1
        evc.sync()
696
697 1
        result = "Schedule removed"
698 1
        status = 200
699
700 1
        log.debug("delete_schedule result %s %s", result, status)
701 1
        return JSONResponse(result, status_code=status)
702
703 1
    @listen_to("kytos/topology.link_up")
704 1
    def on_link_up(self, event):
705
        """Change circuit when link is up or end_maintenance."""
706
        self.handle_link_up(event)
707
708 1
    def handle_link_up(self, event):
709
        """Change circuit when link is up or end_maintenance."""
710 1
        log.info("Event handle_link_up %s", event.content["link"])
711 1
        for evc in self.get_evcs_by_svc_level():
712 1
            if evc.is_enabled() and not evc.archived:
713 1
                with evc.lock:
714 1
                    evc.handle_link_up(event.content["link"])
715
716
    # Possibly replace this with interruptions?
717 1
    @listen_to(
718
        '.*.switch.interface.(link_up|link_down|created|deleted)',
719
        pool='dynamic_single'
720
    )
721 1
    def on_interface_link_change(self, event: KytosEvent):
722
        """
723
        Handler for interface link_up and link_down events
724
        """
725
        with self._lock:
726
            _, _, event_type = event.name.rpartition('.')
727
            iface = event.content.get("interface")
728
            if event_type in ('link_up', 'created'):
729
                self.handle_interface_link_up(iface)
730
            elif event_type in ('link_down', 'deleted'):
731
                self.handle_interface_link_down(iface)
732
733 1
    def handle_interface_link_up(self, interface):
734
        """
735
        Handler for interface link_up events
736
        """
737
        for evc in self.get_evcs_by_svc_level():
738
            log.info("Event handle_interface_link_up %s", interface)
739
            evc.handle_interface_link_up(
740
                interface
741
            )
742
743 1
    def handle_interface_link_down(self, interface):
744
        """
745
        Handler for interface link_down events
746
        """
747
        for evc in self.get_evcs_by_svc_level():
748
            log.info("Event handle_interface_link_down %s", interface)
749
            evc.handle_interface_link_down(
750
                interface
751
            )
752
753 1
    @listen_to("kytos/topology.link_down")
754 1
    def on_link_down(self, event):
755
        """Change circuit when link is down or under_mantenance."""
756
        self.handle_link_down(event)
757
758 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
759
        """Change circuit when link is down or under_mantenance."""
760 1
        link = event.content["link"]
761 1
        log.info("Event handle_link_down %s", link)
762 1
        switch_flows = {}
763 1
        evcs_with_failover = []
764 1
        evcs_normal = []
765 1
        check_failover = []
766 1
        for evc in self.get_evcs_by_svc_level():
767 1
            if evc.is_affected_by_link(link):
768
                # if there is no failover path, handles link down the
769
                # tradditional way
770 1
                if (
771
                    not getattr(evc, 'failover_path', None) or
772
                    evc.is_failover_path_affected_by_link(link)
773
                ):
774 1
                    evcs_normal.append(evc)
775 1
                    continue
776 1
                try:
777 1
                    dpid_flows = evc.get_failover_flows()
778
                # pylint: disable=broad-except
779 1
                except Exception:
780 1
                    err = traceback.format_exc().replace("\n", ", ")
781 1
                    log.error(
782
                        f"Ignore Failover path for {evc} due to error: {err}"
783
                    )
784 1
                    evcs_normal.append(evc)
785 1
                    continue
786 1
                for dpid, flows in dpid_flows.items():
787 1
                    switch_flows.setdefault(dpid, [])
788 1
                    switch_flows[dpid].extend(flows)
789 1
                evcs_with_failover.append(evc)
790
            else:
791 1
                check_failover.append(evc)
792
793 1
        while switch_flows:
794 1
            offset = settings.BATCH_SIZE or None
795 1
            switches = list(switch_flows.keys())
796 1
            for dpid in switches:
797 1
                emit_event(
798
                    self.controller,
799
                    context="kytos.flow_manager",
800
                    name="flows.install",
801
                    content={
802
                        "dpid": dpid,
803
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
804
                    }
805
                )
806 1
                if offset is None or offset >= len(switch_flows[dpid]):
807 1
                    del switch_flows[dpid]
808 1
                    continue
809 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
810 1
            time.sleep(settings.BATCH_INTERVAL)
811
812 1
        for evc in evcs_with_failover:
813 1
            with evc.lock:
814 1
                old_path = evc.current_path
815 1
                evc.current_path = evc.failover_path
816 1
                evc.failover_path = old_path
817 1
                evc.sync()
818 1
            emit_event(self.controller, "redeployed_link_down",
819
                       content=map_evc_event_content(evc))
820 1
            log.info(
821
                f"{evc} redeployed with failover due to link down {link.id}"
822
            )
823
824 1
        for evc in evcs_normal:
825 1
            emit_event(
826
                self.controller,
827
                "evc_affected_by_link_down",
828
                content={"link_id": link.id} | map_evc_event_content(evc)
829
            )
830
831
        # After handling the hot path, check if new failover paths are needed.
832
        # Note that EVCs affected by link down will generate a KytosEvent for
833
        # deployed|redeployed, which will trigger the failover path setup.
834
        # Thus, we just need to further check the check_failover list
835 1
        for evc in check_failover:
836 1
            if evc.is_failover_path_affected_by_link(link):
837 1
                with evc.lock:
838 1
                    evc.setup_failover_path()
839
840 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
841 1
    def on_evc_affected_by_link_down(self, event):
842
        """Change circuit when link is down or under_mantenance."""
843
        self.handle_evc_affected_by_link_down(event)
844
845 1
    def handle_evc_affected_by_link_down(self, event):
846
        """Change circuit when link is down or under_mantenance."""
847 1
        evc = self.circuits.get(event.content["evc_id"])
848 1
        link_id = event.content['link_id']
849 1
        if not evc:
850 1
            return
851 1
        with evc.lock:
852 1
            result = evc.handle_link_down()
853 1
        event_name = "error_redeploy_link_down"
854 1
        if result:
855 1
            log.info(f"{evc} redeployed due to link down {link_id}")
856 1
            event_name = "redeployed_link_down"
857 1
        emit_event(self.controller, event_name,
858
                   content=map_evc_event_content(evc))
859
860 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
861 1
    def on_evc_deployed(self, event):
862
        """Handle EVC deployed|redeployed_link_down."""
863
        self.handle_evc_deployed(event)
864
865 1
    def handle_evc_deployed(self, event):
866
        """Setup failover path on evc deployed."""
867 1
        evc = self.circuits.get(event.content["evc_id"])
868 1
        if not evc:
869 1
            return
870 1
        with evc.lock:
871 1
            evc.setup_failover_path()
872
873 1
    @listen_to("kytos/topology.topology_loaded")
874 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
875
        """Load EVCs once the topology is available."""
876
        self.load_all_evcs()
877
878 1
    def load_all_evcs(self):
879
        """Try to load all EVCs on startup."""
880 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
881 1
        for circuit_id, circuit in circuits:
882 1
            if circuit_id not in self.circuits:
883 1
                self._load_evc(circuit)
884 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits))
885
886 1
    def _load_evc(self, circuit_dict):
887
        """Load one EVC from mongodb to memory."""
888 1
        try:
889 1
            evc = self._evc_from_dict(circuit_dict)
890 1
        except (ValueError, KytosTagError) as exception:
891 1
            log.error(
892
                f"Could not load EVC: dict={circuit_dict} error={exception}"
893
            )
894 1
            return None
895 1
        if evc.archived:
896 1
            return None
897
898 1
        self.circuits.setdefault(evc.id, evc)
899 1
        self.sched.add(evc)
900 1
        return evc
901
902 1
    @listen_to("kytos/flow_manager.flow.error")
903 1
    def on_flow_mod_error(self, event):
904
        """Handle flow mod errors related to an EVC."""
905
        self.handle_flow_mod_error(event)
906
907 1
    def handle_flow_mod_error(self, event):
908
        """Handle flow mod errors related to an EVC."""
909 1
        flow = event.content["flow"]
910 1
        command = event.content.get("error_command")
911 1
        if command != "add":
912
            return
913 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
914 1
        if evc:
915 1
            with evc.lock:
916 1
                evc.remove_current_flows()
917
918 1
    def _evc_dict_with_instances(self, evc_dict):
919
        """Convert some dict values to instance of EVC classes.
920
921
        This method will convert: [UNI, Link]
922
        """
923 1
        data = evc_dict.copy()  # Do not modify the original dict
924 1
        for attribute, value in data.items():
925
            # Get multiple attributes.
926
            # Ex: uni_a, uni_z
927 1
            if "uni" in attribute:
928 1
                try:
929 1
                    data[attribute] = self._uni_from_dict(value)
930 1
                except ValueError as exception:
931 1
                    result = "Error creating UNI: Invalid value"
932 1
                    raise ValueError(result) from exception
933
934 1
            if attribute == "circuit_scheduler":
935 1
                data[attribute] = []
936 1
                for schedule in value:
937 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
938
939
            # Get multiple attributes.
940
            # Ex: primary_links,
941
            #     backup_links,
942
            #     current_links_cache,
943
            #     primary_links_cache,
944
            #     backup_links_cache
945 1
            if "links" in attribute:
946 1
                data[attribute] = [
947
                    self._link_from_dict(link) for link in value
948
                ]
949
950
            # Ex: current_path,
951
            #     primary_path,
952
            #     backup_path
953 1
            if "path" in attribute and attribute != "dynamic_backup_path":
954 1
                data[attribute] = Path(
955
                    [self._link_from_dict(link) for link in value]
956
                )
957
958 1
        return data
959
960 1
    def _evc_from_dict(self, evc_dict):
961 1
        data = self._evc_dict_with_instances(evc_dict)
962 1
        data["table_group"] = self.table_group
963 1
        return EVC(self.controller, **data)
964
965 1
    def _uni_from_dict(self, uni_dict):
966
        """Return a UNI object from python dict."""
967 1
        if uni_dict is None:
968 1
            return False
969
970 1
        interface_id = uni_dict.get("interface_id")
971 1
        interface = self.controller.get_interface_by_id(interface_id)
972 1
        if interface is None:
973 1
            result = (
974
                "Error creating UNI:"
975
                + f"Could not instantiate interface {interface_id}"
976
            )
977 1
            raise ValueError(result) from ValueError
978 1
        tag_convert = {1: "vlan"}
979 1
        tag_dict = uni_dict.get("tag", None)
980 1
        if tag_dict:
981 1
            tag_type = tag_dict.get("tag_type")
982 1
            tag_type = tag_convert.get(tag_type, tag_type)
983 1
            tag_value = tag_dict.get("value")
984 1
            if isinstance(tag_value, list):
985 1
                tag_value = get_tag_ranges(tag_value)
986 1
                mask_list = get_vlan_tags_and_masks(tag_value)
987 1
                tag = TAGRange(tag_type, tag_value, mask_list)
988
            else:
989 1
                tag = TAG(tag_type, tag_value)
990
        else:
991 1
            tag = None
992 1
        uni = UNI(interface, tag)
993 1
        return uni
994
995 1
    def _link_from_dict(self, link_dict):
996
        """Return a Link object from python dict."""
997 1
        id_a = link_dict.get("endpoint_a").get("id")
998 1
        id_b = link_dict.get("endpoint_b").get("id")
999
1000 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1001 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1002 1
        if not endpoint_a:
1003 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1004 1
            raise ValueError(error_msg)
1005 1
        if not endpoint_b:
1006
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1007
            raise ValueError(error_msg)
1008
1009 1
        link = Link(endpoint_a, endpoint_b)
1010 1
        if "metadata" in link_dict:
1011 1
            link.extend_metadata(link_dict.get("metadata"))
1012
1013 1
        s_vlan = link.get_metadata("s_vlan")
1014 1
        if s_vlan:
1015 1
            tag = TAG.from_dict(s_vlan)
1016 1
            if tag is False:
1017
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1018
                raise ValueError(error_msg)
1019 1
            link.update_metadata("s_vlan", tag)
1020 1
        return link
1021
1022 1
    def _find_evc_by_schedule_id(self, schedule_id):
1023
        """
1024
        Find an EVC and CircuitSchedule based on schedule_id.
1025
1026
        :param schedule_id: Schedule ID
1027
        :return: EVC and Schedule
1028
        """
1029 1
        circuits = self._get_circuits_buffer()
1030 1
        found_schedule = None
1031 1
        evc = None
1032
1033
        # pylint: disable=unused-variable
1034 1
        for c_id, circuit in circuits.items():
1035 1
            for schedule in circuit.circuit_scheduler:
1036 1
                if schedule.id == schedule_id:
1037 1
                    found_schedule = schedule
1038 1
                    evc = circuit
1039 1
                    break
1040 1
            if found_schedule:
1041 1
                break
1042 1
        return evc, found_schedule
1043
1044 1
    def _get_circuits_buffer(self):
1045
        """
1046
        Return the circuit buffer.
1047
1048
        If the buffer is empty, try to load data from mongodb.
1049
        """
1050 1
        if not self.circuits:
1051
            # Load circuits from mongodb to buffer
1052 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1053 1
            for c_id, circuit in circuits.items():
1054 1
                evc = self._evc_from_dict(circuit)
1055 1
                self.circuits[c_id] = evc
1056 1
        return self.circuits
1057
1058
    # pylint: disable=attribute-defined-outside-init
1059 1
    @alisten_to("kytos/of_multi_table.enable_table")
1060 1
    async def on_table_enabled(self, event):
1061
        """Handle a recently table enabled."""
1062 1
        table_group = event.content.get("mef_eline", None)
1063 1
        if not table_group:
1064 1
            return
1065 1
        for group in table_group:
1066 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1067 1
                log.error(f'The table group "{group}" is not allowed for '
1068
                          f'mef_eline. Allowed table groups are '
1069
                          f'{settings.TABLE_GROUP_ALLOWED}')
1070 1
                return
1071 1
        self.table_group.update(table_group)
1072 1
        content = {"group_table": self.table_group}
1073 1
        name = "kytos/mef_eline.enable_table"
1074
        await aemit_event(self.controller, name, content)
1075