Passed
Pull Request — master (#424)
by
unknown
04:06
created

build.main.Main.shutdown()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nop 1
dl 0
loc 2
ccs 1
cts 1
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self.intf_events = defaultdict(dict)
68 1
        self.lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        return sorted(self.circuits.values(),
82
                      key=lambda x: (-x.service_level, x.creation_time))
83
84 1
    @staticmethod
85 1
    def get_eline_controller():
86
        """Return the ELineController instance."""
87
        return controllers.ELineController()
88
89 1
    def execute(self):
90
        """Execute once when the napp is running."""
91 1
        if self._lock.locked():
92 1
            return
93 1
        log.debug("Starting consistency routine")
94 1
        with self._lock:
95 1
            self.execute_consistency()
96 1
        log.debug("Finished consistency routine")
97
98 1
    def should_be_checked(self, circuit):
99
        "Verify if the circuit meets the necessary conditions to be checked"
100
        # pylint: disable=too-many-boolean-expressions
101 1
        if (
102
                circuit.is_enabled()
103
                and not circuit.is_active()
104
                and not circuit.lock.locked()
105
                and not circuit.has_recent_removed_flow()
106
                and not circuit.is_recent_updated()
107
                and circuit.are_unis_active(self.controller.switches)
108
                # if a inter-switch EVC does not have current_path, it does not
109
                # make sense to run sdntrace on it
110
                and (circuit.is_intra_switch() or circuit.current_path)
111
                ):
112 1
            return True
113
        return False
114
115 1
    def execute_consistency(self):
116
        """Execute consistency routine."""
117 1
        circuits_to_check = []
118 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
119 1
        for circuit in self.get_evcs_by_svc_level():
120 1
            stored_circuits.pop(circuit.id, None)
121 1
            if self.should_be_checked(circuit):
122 1
                circuits_to_check.append(circuit)
123 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
124 1
        for circuit in circuits_to_check:
125 1
            is_checked = circuits_checked.get(circuit.id)
126 1
            if is_checked:
127 1
                circuit.execution_rounds = 0
128 1
                log.info(f"{circuit} enabled but inactive - activating")
129 1
                with circuit.lock:
130 1
                    circuit.activate()
131 1
                    circuit.sync()
132
            else:
133 1
                circuit.execution_rounds += 1
134 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
135 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
136 1
                    with circuit.lock:
137 1
                        circuit.deploy()
138 1
        for circuit_id in stored_circuits:
139 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
140 1
            self._load_evc(stored_circuits[circuit_id])
141
142 1
    def shutdown(self):
143
        """Execute when your napp is unloaded.
144
145
        If you have some cleanup procedure, insert it here.
146
        """
147
148 1
    @rest("/v2/evc/", methods=["GET"])
149 1
    def list_circuits(self, request: Request) -> JSONResponse:
150
        """Endpoint to return circuits stored.
151
152
        archive query arg if defined (not null) will be filtered
153
        accordingly, by default only non archived evcs will be listed
154
        """
155 1
        log.debug("list_circuits /v2/evc")
156 1
        args = request.query_params
157 1
        archived = args.get("archived", "false").lower()
158 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
159 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
160
                                                      metadata=args)
161 1
        circuits = circuits['circuits']
162 1
        return JSONResponse(circuits)
163
164 1
    @rest("/v2/evc/schedule", methods=["GET"])
165 1
    def list_schedules(self, _request: Request) -> JSONResponse:
166
        """Endpoint to return all schedules stored for all circuits.
167
168
        Return a JSON with the following template:
169
        [{"schedule_id": <schedule_id>,
170
         "circuit_id": <circuit_id>,
171
         "schedule": <schedule object>}]
172
        """
173 1
        log.debug("list_schedules /v2/evc/schedule")
174 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
175 1
        if not circuits:
176 1
            result = {}
177 1
            status = 200
178 1
            return JSONResponse(result, status_code=status)
179
180 1
        result = []
181 1
        status = 200
182 1
        for circuit in circuits:
183 1
            circuit_scheduler = circuit.get("circuit_scheduler")
184 1
            if circuit_scheduler:
185 1
                for scheduler in circuit_scheduler:
186 1
                    value = {
187
                        "schedule_id": scheduler.get("id"),
188
                        "circuit_id": circuit.get("id"),
189
                        "schedule": scheduler,
190
                    }
191 1
                    result.append(value)
192
193 1
        log.debug("list_schedules result %s %s", result, status)
194 1
        return JSONResponse(result, status_code=status)
195
196 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
197 1
    def get_circuit(self, request: Request) -> JSONResponse:
198
        """Endpoint to return a circuit based on id."""
199 1
        circuit_id = request.path_params["circuit_id"]
200 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
201 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
202 1
        if not circuit:
203 1
            result = f"circuit_id {circuit_id} not found"
204 1
            log.debug("get_circuit result %s %s", result, 404)
205 1
            raise HTTPException(404, detail=result)
206 1
        status = 200
207 1
        log.debug("get_circuit result %s %s", circuit, status)
208 1
        return JSONResponse(circuit, status_code=status)
209
210
    # pylint: disable=too-many-branches, too-many-statements
211 1
    @rest("/v2/evc/", methods=["POST"])
212 1
    @validate_openapi(spec)
213 1
    def create_circuit(self, request: Request) -> JSONResponse:
214
        """Try to create a new circuit.
215
216
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
217
        UNI_Z's requested C-VID are available from the interfaces' pools. This
218
        is checked when creating the UNI object.
219
220
        Then, E-Line NApp requests a primary and a backup path to the
221
        Pathfinder NApp using the attributes primary_links and backup_links
222
        submitted via REST
223
224
        # For each link composing paths in #3:
225
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
226
        #  - Using the S-VID obtained, generate abstract flow entries to be
227
        #    sent to FlowManager
228
229
        Push abstract flow entries to FlowManager and FlowManager pushes
230
        OpenFlow entries to datapaths
231
232
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
233
        creation
234
235
        Finnaly, notify user of the status of its request.
236
        """
237
        # Try to create the circuit object
238 1
        log.debug("create_circuit /v2/evc/")
239 1
        data = get_json_or_400(request, self.controller.loop)
240
241 1
        try:
242 1
            evc = self._evc_from_dict(data)
243 1
        except (ValueError, KytosTagError) as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 400)
245 1
            raise HTTPException(400, detail=str(exception)) from exception
246 1
        try:
247 1
            check_disabled_component(evc.uni_a, evc.uni_z)
248 1
        except DisabledSwitch as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 409)
250 1
            raise HTTPException(
251
                    409,
252
                    detail=f"Path is not valid: {exception}"
253
                ) from exception
254
255 1
        if evc.primary_path:
256 1
            try:
257 1
                evc.primary_path.is_valid(
258
                    evc.uni_a.interface.switch,
259
                    evc.uni_z.interface.switch,
260
                    bool(evc.circuit_scheduler),
261
                )
262 1
            except InvalidPath as exception:
263 1
                raise HTTPException(
264
                    400,
265
                    detail=f"primary_path is not valid: {exception}"
266
                ) from exception
267 1
        if evc.backup_path:
268 1
            try:
269 1
                evc.backup_path.is_valid(
270
                    evc.uni_a.interface.switch,
271
                    evc.uni_z.interface.switch,
272
                    bool(evc.circuit_scheduler),
273
                )
274 1
            except InvalidPath as exception:
275 1
                raise HTTPException(
276
                    400,
277
                    detail=f"backup_path is not valid: {exception}"
278
                ) from exception
279
280 1
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282 1
            raise HTTPException(400, detail=detail)
283
284 1
        try:
285 1
            evc._validate_has_primary_or_dynamic()
286 1
        except ValueError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289 1
        try:
290 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
291
        except DuplicatedNoTagUNI as exception:
292
            log.debug("create_circuit result %s %s", exception, 409)
293
            raise HTTPException(409, detail=str(exception)) from exception
294
295 1
        try:
296 1
            self._use_uni_tags(evc)
297 1
        except KytosTagError as exception:
298 1
            raise HTTPException(400, detail=str(exception)) from exception
299
300
        # save circuit
301 1
        try:
302 1
            evc.sync()
303
        except ValidationError as exception:
304
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # store circuit in dictionary
307 1
        self.circuits[evc.id] = evc
308
309
        # Schedule the circuit deploy
310 1
        self.sched.add(evc)
311
312
        # Circuit has no schedule, deploy now
313 1
        if not evc.circuit_scheduler:
314 1
            with evc.lock:
315 1
                evc.deploy()
316
317
        # Notify users
318 1
        result = {"circuit_id": evc.id}
319 1
        status = 201
320 1
        log.debug("create_circuit result %s %s", result, status)
321 1
        emit_event(self.controller, name="created",
322
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324
325 1
    @staticmethod
326 1
    def _use_uni_tags(evc):
327 1
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332 1
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334 1
            raise err
335
336 1
    @listen_to('kytos/flow_manager.flow.removed')
337 1
    def on_flow_delete(self, event):
338
        """Capture delete messages to keep track when flows got removed."""
339
        self.handle_flow_delete(event)
340
341 1
    def handle_flow_delete(self, event):
342
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345 1
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348
349 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350 1
    @validate_openapi(spec)
351 1
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353
354
        The EVC attributes (creation_time, active, current_path,
355
        failover_path, _id, archived) can't be updated.
356
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358 1
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362 1
        except KeyError:
363 1
            result = f"circuit_id {circuit_id} not found"
364 1
            log.debug("update result %s %s", result, 404)
365 1
            raise HTTPException(404, detail=result) from KeyError
366
367 1
        if evc.archived:
368 1
            result = "Can't update archived EVC"
369 1
            log.debug("update result %s %s", result, 409)
370 1
            raise HTTPException(409, detail=result)
371
372 1
        try:
373 1
            updated_data = self._evc_dict_with_instances(data)
374 1
            self._check_no_tag_duplication(
375
                circuit_id, updated_data.get("uni_a"),
376
                updated_data.get("uni_z")
377
            )
378 1
            enable, redeploy = evc.update(**updated_data)
379 1
        except (ValueError, KytosTagError, ValidationError) as exception:
380 1
            log.debug("update result %s %s", exception, 400)
381 1
            raise HTTPException(400, detail=str(exception)) from exception
382 1
        except DuplicatedNoTagUNI as exception:
383
            log.debug("update result %s %s", exception, 409)
384
            raise HTTPException(409, detail=str(exception)) from exception
385 1
        except DisabledSwitch as exception:
386 1
            log.debug("update result %s %s", exception, 409)
387 1
            raise HTTPException(
388
                    409,
389
                    detail=f"Path is not valid: {exception}"
390
                ) from exception
391
392 1
        if evc.is_active():
393
            if enable is False:  # disable if active
394
                with evc.lock:
395
                    evc.remove()
396
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398
                    evc.remove()
399
                    evc.deploy()
400
        else:
401 1
            if enable is True:  # enable if inactive
402 1
                with evc.lock:
403 1
                    evc.deploy()
404 1
            elif evc.is_enabled() and redeploy:
405 1
                with evc.lock:
406 1
                    evc.remove()
407 1
                    evc.deploy()
408 1
        result = {evc.id: evc.as_dict()}
409 1
        status = 200
410
411 1
        log.debug("update result %s %s", result, status)
412 1
        emit_event(self.controller, "updated",
413
                   content=map_evc_event_content(evc, **data))
414 1
        return JSONResponse(result, status_code=status)
415
416 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
417 1
    def delete_circuit(self, request: Request) -> JSONResponse:
418
        """Remove a circuit.
419
420
        First, the flows are removed from the switches, and then the EVC is
421
        disabled.
422
        """
423 1
        circuit_id = request.path_params["circuit_id"]
424 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
425 1
        try:
426 1
            evc = self.circuits[circuit_id]
427 1
        except KeyError:
428 1
            result = f"circuit_id {circuit_id} not found"
429 1
            log.debug("delete_circuit result %s %s", result, 404)
430 1
            raise HTTPException(404, detail=result) from KeyError
431
432 1
        if evc.archived:
433 1
            result = f"Circuit {circuit_id} already removed"
434 1
            log.debug("delete_circuit result %s %s", result, 404)
435 1
            raise HTTPException(404, detail=result)
436
437 1
        log.info("Removing %s", evc)
438 1
        with evc.lock:
439 1
            evc.remove_current_flows()
440 1
            evc.remove_failover_flows(sync=False)
441 1
            evc.deactivate()
442 1
            evc.disable()
443 1
            self.sched.remove(evc)
444 1
            evc.archive()
445 1
            evc.remove_uni_tags()
446 1
            evc.sync()
447 1
        log.info("EVC removed. %s", evc)
448 1
        result = {"response": f"Circuit {circuit_id} removed"}
449 1
        status = 200
450
451 1
        log.debug("delete_circuit result %s %s", result, status)
452 1
        emit_event(self.controller, "deleted",
453
                   content=map_evc_event_content(evc))
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, "Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
519
520 1
        fail_evcs = []
521 1
        for _id in circuit_ids:
522 1
            try:
523 1
                evc = self.circuits[_id]
524 1
                evc.remove_metadata(key)
525 1
            except KeyError:
526 1
                fail_evcs.append(_id)
527
528 1
        if fail_evcs:
529 1
            raise HTTPException(404, detail=fail_evcs)
530 1
        return JSONResponse("Operation successful")
531
532 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
533 1
    def delete_metadata(self, request: Request) -> JSONResponse:
534
        """Delete metadata from an EVC."""
535 1
        circuit_id = request.path_params["circuit_id"]
536 1
        key = request.path_params["key"]
537 1
        try:
538 1
            evc = self.circuits[circuit_id]
539 1
        except KeyError as error:
540 1
            raise HTTPException(
541
                404,
542
                detail=f"circuit_id {circuit_id} not found."
543
            ) from error
544
545 1
        evc.remove_metadata(key)
546 1
        evc.sync()
547 1
        return JSONResponse("Operation successful")
548
549 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
550 1
    def redeploy(self, request: Request) -> JSONResponse:
551
        """Endpoint to force the redeployment of an EVC."""
552 1
        circuit_id = request.path_params["circuit_id"]
553 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
554 1
        try:
555 1
            evc = self.circuits[circuit_id]
556 1
        except KeyError:
557 1
            raise HTTPException(
558
                404,
559
                detail=f"circuit_id {circuit_id} not found"
560
            ) from KeyError
561 1
        if evc.is_enabled():
562 1
            with evc.lock:
563 1
                evc.remove_current_flows()
564 1
                evc.deploy()
565 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
566 1
            status = 202
567
        else:
568 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
569 1
            status = 409
570
571 1
        return JSONResponse(result, status_code=status)
572
573 1
    @rest("/v2/evc/schedule/", methods=["POST"])
574 1
    @validate_openapi(spec)
575 1
    def create_schedule(self, request: Request) -> JSONResponse:
576
        """
577
        Create a new schedule for a given circuit.
578
579
        This service do no check if there are conflicts with another schedule.
580
        Payload example:
581
            {
582
              "circuit_id":"aa:bb:cc",
583
              "schedule": {
584
                "date": "2019-08-07T14:52:10.967Z",
585
                "interval": "string",
586
                "frequency": "1 * * * *",
587
                "action": "create"
588
              }
589
            }
590
        """
591 1
        log.debug("create_schedule /v2/evc/schedule/")
592 1
        data = get_json_or_400(request, self.controller.loop)
593 1
        circuit_id = data["circuit_id"]
594 1
        schedule_data = data["schedule"]
595
596
        # Get EVC from circuits buffer
597 1
        circuits = self._get_circuits_buffer()
598
599
        # get the circuit
600 1
        evc = circuits.get(circuit_id)
601
602
        # get the circuit
603 1
        if not evc:
604 1
            result = f"circuit_id {circuit_id} not found"
605 1
            log.debug("create_schedule result %s %s", result, 404)
606 1
            raise HTTPException(404, detail=result)
607
        # Can not modify circuits deleted and archived
608 1
        if evc.archived:
609 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
610 1
            log.debug("create_schedule result %s %s", result, 409)
611 1
            raise HTTPException(409, detail=result)
612
613
        # new schedule from dict
614 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
615
616
        # If there is no schedule, create the list
617 1
        if not evc.circuit_scheduler:
618 1
            evc.circuit_scheduler = []
619
620
        # Add the new schedule
621 1
        evc.circuit_scheduler.append(new_schedule)
622
623
        # Add schedule job
624 1
        self.sched.add_circuit_job(evc, new_schedule)
625
626
        # save circuit to mongodb
627 1
        evc.sync()
628
629 1
        result = new_schedule.as_dict()
630 1
        status = 201
631
632 1
        log.debug("create_schedule result %s %s", result, status)
633 1
        return JSONResponse(result, status_code=status)
634
635 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
636 1
    @validate_openapi(spec)
637 1
    def update_schedule(self, request: Request) -> JSONResponse:
638
        """Update a schedule.
639
640
        Change all attributes from the given schedule from a EVC circuit.
641
        The schedule ID is preserved as default.
642
        Payload example:
643
            {
644
              "date": "2019-08-07T14:52:10.967Z",
645
              "interval": "string",
646
              "frequency": "1 * * *",
647
              "action": "create"
648
            }
649
        """
650 1
        data = get_json_or_400(request, self.controller.loop)
651 1
        schedule_id = request.path_params["schedule_id"]
652 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
653
654
        # Try to find a circuit schedule
655 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
656
657
        # Can not modify circuits deleted and archived
658 1
        if not found_schedule:
659 1
            result = f"schedule_id {schedule_id} not found"
660 1
            log.debug("update_schedule result %s %s", result, 404)
661 1
            raise HTTPException(404, detail=result)
662 1
        if evc.archived:
663 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
664 1
            log.debug("update_schedule result %s %s", result, 409)
665 1
            raise HTTPException(409, detail=result)
666
667 1
        new_schedule = CircuitSchedule.from_dict(data)
668 1
        new_schedule.id = found_schedule.id
669
        # Remove the old schedule
670 1
        evc.circuit_scheduler.remove(found_schedule)
671
        # Append the modified schedule
672 1
        evc.circuit_scheduler.append(new_schedule)
673
674
        # Cancel all schedule jobs
675 1
        self.sched.cancel_job(found_schedule.id)
676
        # Add the new circuit schedule
677 1
        self.sched.add_circuit_job(evc, new_schedule)
678
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = new_schedule.as_dict()
682 1
        status = 200
683
684 1
        log.debug("update_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
688 1
    def delete_schedule(self, request: Request) -> JSONResponse:
689
        """Remove a circuit schedule.
690
691
        Remove the Schedule from EVC.
692
        Remove the Schedule from cron job.
693
        Save the EVC to the Storehouse.
694
        """
695 1
        schedule_id = request.path_params["schedule_id"]
696 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
697 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
698
699
        # Can not modify circuits deleted and archived
700 1
        if not found_schedule:
701 1
            result = f"schedule_id {schedule_id} not found"
702 1
            log.debug("delete_schedule result %s %s", result, 404)
703 1
            raise HTTPException(404, detail=result)
704
705 1
        if evc.archived:
706 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
707 1
            log.debug("delete_schedule result %s %s", result, 409)
708 1
            raise HTTPException(409, detail=result)
709
710
        # Remove the old schedule
711 1
        evc.circuit_scheduler.remove(found_schedule)
712
713
        # Cancel all schedule jobs
714 1
        self.sched.cancel_job(found_schedule.id)
715
        # Save EVC to mongodb
716 1
        evc.sync()
717
718 1
        result = "Schedule removed"
719 1
        status = 200
720
721 1
        log.debug("delete_schedule result %s %s", result, status)
722 1
        return JSONResponse(result, status_code=status)
723
724 1
    def _check_no_tag_duplication(
725
        self,
726
        evc_id: str,
727
        uni_a: Optional[UNI] = None,
728
        uni_z: Optional[UNI] = None
729
    ):
730
        """Check if the given EVC has UNIs with no tag and if these are
731
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
732
        Args:
733
            evc (dict): EVC to be analyzed.
734
        """
735
736
        # No UNIs
737 1
        if not (uni_a or uni_z):
738 1
            return
739
740 1
        if (not (uni_a and not uni_a.user_tag) and
741
                not (uni_z and not uni_z.user_tag)):
742 1
            return
743 1
        for circuit in self.circuits.copy().values():
744 1
            if (not circuit.archived and circuit._id != evc_id):
745 1
                if uni_a and uni_a.user_tag is None:
746 1
                    circuit.check_no_tag_duplicate(uni_a)
747 1
                if uni_z and uni_z.user_tag is None:
748 1
                    circuit.check_no_tag_duplicate(uni_z)
749
750 1
    @listen_to("kytos/topology.link_up")
751 1
    def on_link_up(self, event):
752
        """Change circuit when link is up or end_maintenance."""
753
        self.handle_link_up(event)
754
755 1
    def handle_link_up(self, event):
756
        """Change circuit when link is up or end_maintenance."""
757 1
        log.info("Event handle_link_up %s", event.content["link"])
758 1
        for evc in self.get_evcs_by_svc_level():
759 1
            if evc.is_enabled() and not evc.archived:
760 1
                with evc.lock:
761 1
                    evc.handle_link_up(event.content["link"])
762
763
    # Possibly replace this with interruptions?
764 1
    @listen_to(
765
        '.*.switch.interface.(link_up|link_down|created|deleted)'
766
    )
767 1
    def on_interface_link_change(self, event: KytosEvent):
768
        """
769
        Handler for interface link_up and link_down events.
770
771
        To avoid multiple database updated (link flap):
772
        Every interface is identfied and processed in parallel.
773
        Once an interface event is received a time is started.
774
        While time is running self.intf_events will be updated.
775
        After time has passed last received event will be processed.
776
        """
777
        iface = event.content.get("interface")
778
        with self.lock_interfaces[iface.id]:
779
            _now = event.timestamp
780
            # Return out of order events
781
            if (
782
                iface.id in self.intf_events
783
                and self.intf_events[iface.id]["event"].timestamp > _now
784
            ):
785
                return
786
            self.intf_events[iface.id].update({"event": event})
787
            if self.intf_events[iface.id].get("time_lock"):
788
                return
789
            self.intf_events[iface.id].update({"time_lock": True})
790
        time.sleep(settings.TIME_INTERFACE)
791
        with self.lock_interfaces[iface.id]:
792
            event = self.intf_events[iface.id]["event"]
793
            self.intf_events[iface.id].update({"time_lock": False})
794
            self.handle_on_interface_link_change(event)
795
796 1
    def handle_on_interface_link_change(self, event: KytosEvent):
797
        """
798
        Handler to sort interface events {link_(up, down), create, deleted}
799
        """
800 1
        _, _, event_type = event.name.rpartition('.')
801 1
        iface = event.content.get("interface")
802 1
        if event_type in ('link_up', 'created'):
803 1
            self.handle_interface_link_up(iface)
804 1
        elif event_type in ('link_down', 'deleted'):
805 1
            self.handle_interface_link_down(iface)
806
807 1
    def handle_interface_link_up(self, interface):
808
        """
809
        Handler for interface link_up events
810
        """
811
        for evc in self.get_evcs_by_svc_level():
812
            log.info("Event handle_interface_link_up %s", interface)
813
            evc.handle_interface_link_up(
814
                interface
815
            )
816
817 1
    def handle_interface_link_down(self, interface):
818
        """
819
        Handler for interface link_down events
820
        """
821
        for evc in self.get_evcs_by_svc_level():
822
            log.info("Event handle_interface_link_down %s", interface)
823
            evc.handle_interface_link_down(
824
                interface
825
            )
826
827 1
    @listen_to("kytos/topology.link_down")
828 1
    def on_link_down(self, event):
829
        """Change circuit when link is down or under_mantenance."""
830
        self.handle_link_down(event)
831
832 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
833
        """Change circuit when link is down or under_mantenance."""
834 1
        link = event.content["link"]
835 1
        log.info("Event handle_link_down %s", link)
836 1
        switch_flows = {}
837 1
        evcs_with_failover = []
838 1
        evcs_normal = []
839 1
        check_failover = []
840 1
        for evc in self.get_evcs_by_svc_level():
841 1
            if evc.is_affected_by_link(link):
842
                # if there is no failover path, handles link down the
843
                # tradditional way
844 1
                if (
845
                    not getattr(evc, 'failover_path', None) or
846
                    evc.is_failover_path_affected_by_link(link)
847
                ):
848 1
                    evcs_normal.append(evc)
849 1
                    continue
850 1
                try:
851 1
                    dpid_flows = evc.get_failover_flows()
852
                # pylint: disable=broad-except
853 1
                except Exception:
854 1
                    err = traceback.format_exc().replace("\n", ", ")
855 1
                    log.error(
856
                        f"Ignore Failover path for {evc} due to error: {err}"
857
                    )
858 1
                    evcs_normal.append(evc)
859 1
                    continue
860 1
                for dpid, flows in dpid_flows.items():
861 1
                    switch_flows.setdefault(dpid, [])
862 1
                    switch_flows[dpid].extend(flows)
863 1
                evcs_with_failover.append(evc)
864
            else:
865 1
                check_failover.append(evc)
866
867 1
        while switch_flows:
868 1
            offset = settings.BATCH_SIZE or None
869 1
            switches = list(switch_flows.keys())
870 1
            for dpid in switches:
871 1
                emit_event(
872
                    self.controller,
873
                    context="kytos.flow_manager",
874
                    name="flows.install",
875
                    content={
876
                        "dpid": dpid,
877
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
878
                    }
879
                )
880 1
                if offset is None or offset >= len(switch_flows[dpid]):
881 1
                    del switch_flows[dpid]
882 1
                    continue
883 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
884 1
            time.sleep(settings.BATCH_INTERVAL)
885
886 1
        for evc in evcs_with_failover:
887 1
            with evc.lock:
888 1
                old_path = evc.current_path
889 1
                evc.current_path = evc.failover_path
890 1
                evc.failover_path = old_path
891 1
                evc.sync()
892 1
            emit_event(self.controller, "redeployed_link_down",
893
                       content=map_evc_event_content(evc))
894 1
            log.info(
895
                f"{evc} redeployed with failover due to link down {link.id}"
896
            )
897
898 1
        for evc in evcs_normal:
899 1
            emit_event(
900
                self.controller,
901
                "evc_affected_by_link_down",
902
                content={"link_id": link.id} | map_evc_event_content(evc)
903
            )
904
905
        # After handling the hot path, check if new failover paths are needed.
906
        # Note that EVCs affected by link down will generate a KytosEvent for
907
        # deployed|redeployed, which will trigger the failover path setup.
908
        # Thus, we just need to further check the check_failover list
909 1
        for evc in check_failover:
910 1
            if evc.is_failover_path_affected_by_link(link):
911 1
                with evc.lock:
912 1
                    evc.setup_failover_path()
913
914 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
915 1
    def on_evc_affected_by_link_down(self, event):
916
        """Change circuit when link is down or under_mantenance."""
917
        self.handle_evc_affected_by_link_down(event)
918
919 1
    def handle_evc_affected_by_link_down(self, event):
920
        """Change circuit when link is down or under_mantenance."""
921 1
        evc = self.circuits.get(event.content["evc_id"])
922 1
        link_id = event.content['link_id']
923 1
        if not evc:
924 1
            return
925 1
        with evc.lock:
926 1
            result = evc.handle_link_down()
927 1
        event_name = "error_redeploy_link_down"
928 1
        if result:
929 1
            log.info(f"{evc} redeployed due to link down {link_id}")
930 1
            event_name = "redeployed_link_down"
931 1
        emit_event(self.controller, event_name,
932
                   content=map_evc_event_content(evc))
933
934 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
935 1
    def on_evc_deployed(self, event):
936
        """Handle EVC deployed|redeployed_link_down."""
937
        self.handle_evc_deployed(event)
938
939 1
    def handle_evc_deployed(self, event):
940
        """Setup failover path on evc deployed."""
941 1
        evc = self.circuits.get(event.content["evc_id"])
942 1
        if not evc:
943 1
            return
944 1
        with evc.lock:
945 1
            evc.setup_failover_path()
946
947 1
    @listen_to("kytos/topology.topology_loaded")
948 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
949
        """Load EVCs once the topology is available."""
950
        self.load_all_evcs()
951
952 1
    def load_all_evcs(self):
953
        """Try to load all EVCs on startup."""
954 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
955 1
        for circuit_id, circuit in circuits:
956 1
            if circuit_id not in self.circuits:
957 1
                self._load_evc(circuit)
958 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
959
                   timeout=1)
960
961 1
    def _load_evc(self, circuit_dict):
962
        """Load one EVC from mongodb to memory."""
963 1
        try:
964 1
            evc = self._evc_from_dict(circuit_dict)
965 1
        except (ValueError, KytosTagError) as exception:
966 1
            log.error(
967
                f"Could not load EVC: dict={circuit_dict} error={exception}"
968
            )
969 1
            return None
970 1
        if evc.archived:
971 1
            return None
972
973 1
        self.circuits.setdefault(evc.id, evc)
974 1
        self.sched.add(evc)
975 1
        return evc
976
977 1
    @listen_to("kytos/flow_manager.flow.error")
978 1
    def on_flow_mod_error(self, event):
979
        """Handle flow mod errors related to an EVC."""
980
        self.handle_flow_mod_error(event)
981
982 1
    def handle_flow_mod_error(self, event):
983
        """Handle flow mod errors related to an EVC."""
984 1
        flow = event.content["flow"]
985 1
        command = event.content.get("error_command")
986 1
        if command != "add":
987
            return
988 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
989 1
        if evc:
990 1
            with evc.lock:
991 1
                evc.remove_current_flows()
992
993 1
    def _evc_dict_with_instances(self, evc_dict):
994
        """Convert some dict values to instance of EVC classes.
995
996
        This method will convert: [UNI, Link]
997
        """
998 1
        data = evc_dict.copy()  # Do not modify the original dict
999 1
        for attribute, value in data.items():
1000
            # Get multiple attributes.
1001
            # Ex: uni_a, uni_z
1002 1
            if "uni" in attribute:
1003 1
                try:
1004 1
                    data[attribute] = self._uni_from_dict(value)
1005 1
                except ValueError as exception:
1006 1
                    result = "Error creating UNI: Invalid value"
1007 1
                    raise ValueError(result) from exception
1008
1009 1
            if attribute == "circuit_scheduler":
1010 1
                data[attribute] = []
1011 1
                for schedule in value:
1012 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1013
1014
            # Get multiple attributes.
1015
            # Ex: primary_links,
1016
            #     backup_links,
1017
            #     current_links_cache,
1018
            #     primary_links_cache,
1019
            #     backup_links_cache
1020 1
            if "links" in attribute:
1021 1
                data[attribute] = [
1022
                    self._link_from_dict(link) for link in value
1023
                ]
1024
1025
            # Ex: current_path,
1026
            #     primary_path,
1027
            #     backup_path
1028 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1029 1
                data[attribute] = Path(
1030
                    [self._link_from_dict(link) for link in value]
1031
                )
1032
1033 1
        return data
1034
1035 1
    def _evc_from_dict(self, evc_dict):
1036 1
        data = self._evc_dict_with_instances(evc_dict)
1037 1
        data["table_group"] = self.table_group
1038 1
        return EVC(self.controller, **data)
1039
1040 1
    def _uni_from_dict(self, uni_dict):
1041
        """Return a UNI object from python dict."""
1042 1
        if uni_dict is None:
1043 1
            return False
1044
1045 1
        interface_id = uni_dict.get("interface_id")
1046 1
        interface = self.controller.get_interface_by_id(interface_id)
1047 1
        if interface is None:
1048 1
            result = (
1049
                "Error creating UNI:"
1050
                + f"Could not instantiate interface {interface_id}"
1051
            )
1052 1
            raise ValueError(result) from ValueError
1053 1
        tag_convert = {1: "vlan"}
1054 1
        tag_dict = uni_dict.get("tag", None)
1055 1
        if tag_dict:
1056 1
            tag_type = tag_dict.get("tag_type")
1057 1
            tag_type = tag_convert.get(tag_type, tag_type)
1058 1
            tag_value = tag_dict.get("value")
1059 1
            if isinstance(tag_value, list):
1060 1
                tag_value = get_tag_ranges(tag_value)
1061 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1062 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1063
            else:
1064 1
                tag = TAG(tag_type, tag_value)
1065
        else:
1066 1
            tag = None
1067 1
        uni = UNI(interface, tag)
1068 1
        return uni
1069
1070 1
    def _link_from_dict(self, link_dict):
1071
        """Return a Link object from python dict."""
1072 1
        id_a = link_dict.get("endpoint_a").get("id")
1073 1
        id_b = link_dict.get("endpoint_b").get("id")
1074
1075 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1076 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1077 1
        if not endpoint_a:
1078 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1079 1
            raise ValueError(error_msg)
1080 1
        if not endpoint_b:
1081
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1082
            raise ValueError(error_msg)
1083
1084 1
        link = Link(endpoint_a, endpoint_b)
1085 1
        if "metadata" in link_dict:
1086 1
            link.extend_metadata(link_dict.get("metadata"))
1087
1088 1
        s_vlan = link.get_metadata("s_vlan")
1089 1
        if s_vlan:
1090 1
            tag = TAG.from_dict(s_vlan)
1091 1
            if tag is False:
1092
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1093
                raise ValueError(error_msg)
1094 1
            link.update_metadata("s_vlan", tag)
1095 1
        return link
1096
1097 1
    def _find_evc_by_schedule_id(self, schedule_id):
1098
        """
1099
        Find an EVC and CircuitSchedule based on schedule_id.
1100
1101
        :param schedule_id: Schedule ID
1102
        :return: EVC and Schedule
1103
        """
1104 1
        circuits = self._get_circuits_buffer()
1105 1
        found_schedule = None
1106 1
        evc = None
1107
1108
        # pylint: disable=unused-variable
1109 1
        for c_id, circuit in circuits.items():
1110 1
            for schedule in circuit.circuit_scheduler:
1111 1
                if schedule.id == schedule_id:
1112 1
                    found_schedule = schedule
1113 1
                    evc = circuit
1114 1
                    break
1115 1
            if found_schedule:
1116 1
                break
1117 1
        return evc, found_schedule
1118
1119 1
    def _get_circuits_buffer(self):
1120
        """
1121
        Return the circuit buffer.
1122
1123
        If the buffer is empty, try to load data from mongodb.
1124
        """
1125 1
        if not self.circuits:
1126
            # Load circuits from mongodb to buffer
1127 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1128 1
            for c_id, circuit in circuits.items():
1129 1
                evc = self._evc_from_dict(circuit)
1130 1
                self.circuits[c_id] = evc
1131 1
        return self.circuits
1132
1133
    # pylint: disable=attribute-defined-outside-init
1134 1
    @alisten_to("kytos/of_multi_table.enable_table")
1135 1
    async def on_table_enabled(self, event):
1136
        """Handle a recently table enabled."""
1137 1
        table_group = event.content.get("mef_eline", None)
1138 1
        if not table_group:
1139 1
            return
1140 1
        for group in table_group:
1141 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1142 1
                log.error(f'The table group "{group}" is not allowed for '
1143
                          f'mef_eline. Allowed table groups are '
1144
                          f'{settings.TABLE_GROUP_ALLOWED}')
1145 1
                return
1146 1
        self.table_group.update(table_group)
1147 1
        content = {"group_table": self.table_group}
1148 1
        name = "kytos/mef_eline.enable_table"
1149
        await aemit_event(self.controller, name, content)
1150