Passed
Push — master ( 94ec16...630564 )
by Vinicius
03:05 queued 31s
created

build.main.Main._uni_from_dict()   B

Complexity

Conditions 5

Size

Total Lines 29
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 24
nop 2
dl 0
loc 29
ccs 22
cts 22
cp 1
crap 5
rs 8.8373
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        return sorted(self.circuits.values(),
82
                      key=lambda x: (-x.service_level, x.creation_time))
83
84 1
    @staticmethod
85 1
    def get_eline_controller():
86
        """Return the ELineController instance."""
87
        return controllers.ELineController()
88
89 1
    def execute(self):
90
        """Execute once when the napp is running."""
91 1
        if self._lock.locked():
92 1
            return
93 1
        log.debug("Starting consistency routine")
94 1
        with self._lock:
95 1
            self.execute_consistency()
96 1
        log.debug("Finished consistency routine")
97
98 1
    def should_be_checked(self, circuit):
99
        "Verify if the circuit meets the necessary conditions to be checked"
100
        # pylint: disable=too-many-boolean-expressions
101 1
        if (
102
                circuit.is_enabled()
103
                and not circuit.is_active()
104
                and not circuit.lock.locked()
105
                and not circuit.has_recent_removed_flow()
106
                and not circuit.is_recent_updated()
107
                and circuit.are_unis_active(self.controller.switches)
108
                # if a inter-switch EVC does not have current_path, it does not
109
                # make sense to run sdntrace on it
110
                and (circuit.is_intra_switch() or circuit.current_path)
111
                ):
112 1
            return True
113
        return False
114
115 1
    def execute_consistency(self):
116
        """Execute consistency routine."""
117 1
        circuits_to_check = []
118 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
119 1
        for circuit in self.get_evcs_by_svc_level():
120 1
            stored_circuits.pop(circuit.id, None)
121 1
            if self.should_be_checked(circuit):
122 1
                circuits_to_check.append(circuit)
123 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
124 1
        for circuit in circuits_to_check:
125 1
            is_checked = circuits_checked.get(circuit.id)
126 1
            if is_checked:
127 1
                circuit.execution_rounds = 0
128 1
                log.info(f"{circuit} enabled but inactive - activating")
129 1
                with circuit.lock:
130 1
                    circuit.activate()
131 1
                    circuit.sync()
132
            else:
133 1
                circuit.execution_rounds += 1
134 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
135 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
136 1
                    with circuit.lock:
137 1
                        circuit.deploy()
138 1
        for circuit_id in stored_circuits:
139 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
140 1
            self._load_evc(stored_circuits[circuit_id])
141
142 1
    def shutdown(self):
143
        """Execute when your napp is unloaded.
144
145
        If you have some cleanup procedure, insert it here.
146
        """
147
148 1
    @rest("/v2/evc/", methods=["GET"])
149 1
    def list_circuits(self, request: Request) -> JSONResponse:
150
        """Endpoint to return circuits stored.
151
152
        archive query arg if defined (not null) will be filtered
153
        accordingly, by default only non archived evcs will be listed
154
        """
155 1
        log.debug("list_circuits /v2/evc")
156 1
        args = request.query_params
157 1
        archived = args.get("archived", "false").lower()
158 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
159 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
160
                                                      metadata=args)
161 1
        circuits = circuits['circuits']
162 1
        return JSONResponse(circuits)
163
164 1
    @rest("/v2/evc/schedule", methods=["GET"])
165 1
    def list_schedules(self, _request: Request) -> JSONResponse:
166
        """Endpoint to return all schedules stored for all circuits.
167
168
        Return a JSON with the following template:
169
        [{"schedule_id": <schedule_id>,
170
         "circuit_id": <circuit_id>,
171
         "schedule": <schedule object>}]
172
        """
173 1
        log.debug("list_schedules /v2/evc/schedule")
174 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
175 1
        if not circuits:
176 1
            result = {}
177 1
            status = 200
178 1
            return JSONResponse(result, status_code=status)
179
180 1
        result = []
181 1
        status = 200
182 1
        for circuit in circuits:
183 1
            circuit_scheduler = circuit.get("circuit_scheduler")
184 1
            if circuit_scheduler:
185 1
                for scheduler in circuit_scheduler:
186 1
                    value = {
187
                        "schedule_id": scheduler.get("id"),
188
                        "circuit_id": circuit.get("id"),
189
                        "schedule": scheduler,
190
                    }
191 1
                    result.append(value)
192
193 1
        log.debug("list_schedules result %s %s", result, status)
194 1
        return JSONResponse(result, status_code=status)
195
196 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
197 1
    def get_circuit(self, request: Request) -> JSONResponse:
198
        """Endpoint to return a circuit based on id."""
199 1
        circuit_id = request.path_params["circuit_id"]
200 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
201 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
202 1
        if not circuit:
203 1
            result = f"circuit_id {circuit_id} not found"
204 1
            log.debug("get_circuit result %s %s", result, 404)
205 1
            raise HTTPException(404, detail=result)
206 1
        status = 200
207 1
        log.debug("get_circuit result %s %s", circuit, status)
208 1
        return JSONResponse(circuit, status_code=status)
209
210
    # pylint: disable=too-many-branches, too-many-statements
211 1
    @rest("/v2/evc/", methods=["POST"])
212 1
    @validate_openapi(spec)
213 1
    def create_circuit(self, request: Request) -> JSONResponse:
214
        """Try to create a new circuit.
215
216
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
217
        UNI_Z's requested C-VID are available from the interfaces' pools. This
218
        is checked when creating the UNI object.
219
220
        Then, E-Line NApp requests a primary and a backup path to the
221
        Pathfinder NApp using the attributes primary_links and backup_links
222
        submitted via REST
223
224
        # For each link composing paths in #3:
225
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
226
        #  - Using the S-VID obtained, generate abstract flow entries to be
227
        #    sent to FlowManager
228
229
        Push abstract flow entries to FlowManager and FlowManager pushes
230
        OpenFlow entries to datapaths
231
232
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
233
        creation
234
235
        Finnaly, notify user of the status of its request.
236
        """
237
        # Try to create the circuit object
238 1
        log.debug("create_circuit /v2/evc/")
239 1
        data = get_json_or_400(request, self.controller.loop)
240
241 1
        try:
242 1
            evc = self._evc_from_dict(data)
243 1
        except (ValueError, KytosTagError) as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 400)
245 1
            raise HTTPException(400, detail=str(exception)) from exception
246 1
        try:
247 1
            check_disabled_component(evc.uni_a, evc.uni_z)
248 1
        except DisabledSwitch as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 409)
250 1
            raise HTTPException(
251
                    409,
252
                    detail=f"Path is not valid: {exception}"
253
                ) from exception
254
255 1
        if evc.primary_path:
256 1
            try:
257 1
                evc.primary_path.is_valid(
258
                    evc.uni_a.interface.switch,
259
                    evc.uni_z.interface.switch,
260
                    bool(evc.circuit_scheduler),
261
                )
262 1
            except InvalidPath as exception:
263 1
                raise HTTPException(
264
                    400,
265
                    detail=f"primary_path is not valid: {exception}"
266
                ) from exception
267 1
        if evc.backup_path:
268 1
            try:
269 1
                evc.backup_path.is_valid(
270
                    evc.uni_a.interface.switch,
271
                    evc.uni_z.interface.switch,
272
                    bool(evc.circuit_scheduler),
273
                )
274 1
            except InvalidPath as exception:
275 1
                raise HTTPException(
276
                    400,
277
                    detail=f"backup_path is not valid: {exception}"
278
                ) from exception
279
280 1
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282 1
            raise HTTPException(400, detail=detail)
283
284 1
        try:
285 1
            evc._validate_has_primary_or_dynamic()
286 1
        except ValueError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289 1
        try:
290 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
291
        except DuplicatedNoTagUNI as exception:
292
            log.debug("create_circuit result %s %s", exception, 409)
293
            raise HTTPException(409, detail=str(exception)) from exception
294
295 1
        try:
296 1
            self._use_uni_tags(evc)
297 1
        except KytosTagError as exception:
298 1
            raise HTTPException(400, detail=str(exception)) from exception
299
300
        # save circuit
301 1
        try:
302 1
            evc.sync()
303
        except ValidationError as exception:
304
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # store circuit in dictionary
307 1
        self.circuits[evc.id] = evc
308
309
        # Schedule the circuit deploy
310 1
        self.sched.add(evc)
311
312
        # Circuit has no schedule, deploy now
313 1
        if not evc.circuit_scheduler:
314 1
            with evc.lock:
315 1
                evc.deploy()
316
317
        # Notify users
318 1
        result = {"circuit_id": evc.id}
319 1
        status = 201
320 1
        log.debug("create_circuit result %s %s", result, status)
321 1
        emit_event(self.controller, name="created",
322
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324
325 1
    @staticmethod
326 1
    def _use_uni_tags(evc):
327 1
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332 1
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334 1
            raise err
335
336 1
    @listen_to('kytos/flow_manager.flow.removed')
337 1
    def on_flow_delete(self, event):
338
        """Capture delete messages to keep track when flows got removed."""
339
        self.handle_flow_delete(event)
340
341 1
    def handle_flow_delete(self, event):
342
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345 1
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348
349 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350 1
    @validate_openapi(spec)
351 1
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353
354
        The EVC attributes (creation_time, active, current_path,
355
        failover_path, _id, archived) can't be updated.
356
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358 1
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362 1
        except KeyError:
363 1
            result = f"circuit_id {circuit_id} not found"
364 1
            log.debug("update result %s %s", result, 404)
365 1
            raise HTTPException(404, detail=result) from KeyError
366
367 1
        if evc.archived:
368 1
            result = "Can't update archived EVC"
369 1
            log.debug("update result %s %s", result, 409)
370 1
            raise HTTPException(409, detail=result)
371
372 1
        try:
373 1
            updated_data = self._evc_dict_with_instances(data)
374 1
            self._check_no_tag_duplication(
375
                circuit_id, updated_data.get("uni_a"),
376
                updated_data.get("uni_z")
377
            )
378 1
            enable, redeploy = evc.update(**updated_data)
379 1
        except (ValueError, KytosTagError, ValidationError) as exception:
380 1
            log.debug("update result %s %s", exception, 400)
381 1
            raise HTTPException(400, detail=str(exception)) from exception
382 1
        except DuplicatedNoTagUNI as exception:
383
            log.debug("update result %s %s", exception, 409)
384
            raise HTTPException(409, detail=str(exception)) from exception
385 1
        except DisabledSwitch as exception:
386 1
            log.debug("update result %s %s", exception, 409)
387 1
            raise HTTPException(
388
                    409,
389
                    detail=f"Path is not valid: {exception}"
390
                ) from exception
391
392 1
        if evc.is_active():
393
            if enable is False:  # disable if active
394
                with evc.lock:
395
                    evc.remove()
396
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398
                    evc.remove()
399
                    evc.deploy()
400
        else:
401 1
            if enable is True:  # enable if inactive
402 1
                with evc.lock:
403 1
                    evc.deploy()
404 1
            elif evc.is_enabled() and redeploy:
405 1
                with evc.lock:
406 1
                    evc.remove()
407 1
                    evc.deploy()
408 1
        result = {evc.id: evc.as_dict()}
409 1
        status = 200
410
411 1
        log.debug("update result %s %s", result, status)
412 1
        emit_event(self.controller, "updated",
413
                   content=map_evc_event_content(evc, **data))
414 1
        return JSONResponse(result, status_code=status)
415
416 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
417 1
    def delete_circuit(self, request: Request) -> JSONResponse:
418
        """Remove a circuit.
419
420
        First, the flows are removed from the switches, and then the EVC is
421
        disabled.
422
        """
423 1
        circuit_id = request.path_params["circuit_id"]
424 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
425 1
        try:
426 1
            evc = self.circuits[circuit_id]
427 1
        except KeyError:
428 1
            result = f"circuit_id {circuit_id} not found"
429 1
            log.debug("delete_circuit result %s %s", result, 404)
430 1
            raise HTTPException(404, detail=result) from KeyError
431
432 1
        if evc.archived:
433 1
            result = f"Circuit {circuit_id} already removed"
434 1
            log.debug("delete_circuit result %s %s", result, 404)
435 1
            raise HTTPException(404, detail=result)
436
437 1
        log.info("Removing %s", evc)
438 1
        with evc.lock:
439 1
            evc.remove_current_flows()
440 1
            evc.remove_failover_flows(sync=False)
441 1
            evc.deactivate()
442 1
            evc.disable()
443 1
            self.sched.remove(evc)
444 1
            evc.archive()
445 1
            evc.remove_uni_tags()
446 1
            evc.sync()
447 1
        log.info("EVC removed. %s", evc)
448 1
        result = {"response": f"Circuit {circuit_id} removed"}
449 1
        status = 200
450
451 1
        log.debug("delete_circuit result %s %s", result, status)
452 1
        emit_event(self.controller, "deleted",
453
                   content=map_evc_event_content(evc))
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, "Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
519
520 1
        fail_evcs = []
521 1
        for _id in circuit_ids:
522 1
            try:
523 1
                evc = self.circuits[_id]
524 1
                evc.remove_metadata(key)
525 1
            except KeyError:
526 1
                fail_evcs.append(_id)
527
528 1
        if fail_evcs:
529 1
            raise HTTPException(404, detail=fail_evcs)
530 1
        return JSONResponse("Operation successful")
531
532 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
533 1
    def delete_metadata(self, request: Request) -> JSONResponse:
534
        """Delete metadata from an EVC."""
535 1
        circuit_id = request.path_params["circuit_id"]
536 1
        key = request.path_params["key"]
537 1
        try:
538 1
            evc = self.circuits[circuit_id]
539 1
        except KeyError as error:
540 1
            raise HTTPException(
541
                404,
542
                detail=f"circuit_id {circuit_id} not found."
543
            ) from error
544
545 1
        evc.remove_metadata(key)
546 1
        evc.sync()
547 1
        return JSONResponse("Operation successful")
548
549 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
550 1
    def redeploy(self, request: Request) -> JSONResponse:
551
        """Endpoint to force the redeployment of an EVC."""
552 1
        circuit_id = request.path_params["circuit_id"]
553 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
554 1
        try:
555 1
            evc = self.circuits[circuit_id]
556 1
        except KeyError:
557 1
            raise HTTPException(
558
                404,
559
                detail=f"circuit_id {circuit_id} not found"
560
            ) from KeyError
561 1
        if evc.is_enabled():
562 1
            with evc.lock:
563 1
                evc.remove_current_flows()
564 1
                evc.deploy()
565 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
566 1
            status = 202
567
        else:
568 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
569 1
            status = 409
570
571 1
        return JSONResponse(result, status_code=status)
572
573 1
    @rest("/v2/evc/schedule/", methods=["POST"])
574 1
    @validate_openapi(spec)
575 1
    def create_schedule(self, request: Request) -> JSONResponse:
576
        """
577
        Create a new schedule for a given circuit.
578
579
        This service do no check if there are conflicts with another schedule.
580
        Payload example:
581
            {
582
              "circuit_id":"aa:bb:cc",
583
              "schedule": {
584
                "date": "2019-08-07T14:52:10.967Z",
585
                "interval": "string",
586
                "frequency": "1 * * * *",
587
                "action": "create"
588
              }
589
            }
590
        """
591 1
        log.debug("create_schedule /v2/evc/schedule/")
592 1
        data = get_json_or_400(request, self.controller.loop)
593 1
        circuit_id = data["circuit_id"]
594 1
        schedule_data = data["schedule"]
595
596
        # Get EVC from circuits buffer
597 1
        circuits = self._get_circuits_buffer()
598
599
        # get the circuit
600 1
        evc = circuits.get(circuit_id)
601
602
        # get the circuit
603 1
        if not evc:
604 1
            result = f"circuit_id {circuit_id} not found"
605 1
            log.debug("create_schedule result %s %s", result, 404)
606 1
            raise HTTPException(404, detail=result)
607
        # Can not modify circuits deleted and archived
608 1
        if evc.archived:
609 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
610 1
            log.debug("create_schedule result %s %s", result, 409)
611 1
            raise HTTPException(409, detail=result)
612
613
        # new schedule from dict
614 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
615
616
        # If there is no schedule, create the list
617 1
        if not evc.circuit_scheduler:
618 1
            evc.circuit_scheduler = []
619
620
        # Add the new schedule
621 1
        evc.circuit_scheduler.append(new_schedule)
622
623
        # Add schedule job
624 1
        self.sched.add_circuit_job(evc, new_schedule)
625
626
        # save circuit to mongodb
627 1
        evc.sync()
628
629 1
        result = new_schedule.as_dict()
630 1
        status = 201
631
632 1
        log.debug("create_schedule result %s %s", result, status)
633 1
        return JSONResponse(result, status_code=status)
634
635 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
636 1
    @validate_openapi(spec)
637 1
    def update_schedule(self, request: Request) -> JSONResponse:
638
        """Update a schedule.
639
640
        Change all attributes from the given schedule from a EVC circuit.
641
        The schedule ID is preserved as default.
642
        Payload example:
643
            {
644
              "date": "2019-08-07T14:52:10.967Z",
645
              "interval": "string",
646
              "frequency": "1 * * *",
647
              "action": "create"
648
            }
649
        """
650 1
        data = get_json_or_400(request, self.controller.loop)
651 1
        schedule_id = request.path_params["schedule_id"]
652 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
653
654
        # Try to find a circuit schedule
655 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
656
657
        # Can not modify circuits deleted and archived
658 1
        if not found_schedule:
659 1
            result = f"schedule_id {schedule_id} not found"
660 1
            log.debug("update_schedule result %s %s", result, 404)
661 1
            raise HTTPException(404, detail=result)
662 1
        if evc.archived:
663 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
664 1
            log.debug("update_schedule result %s %s", result, 409)
665 1
            raise HTTPException(409, detail=result)
666
667 1
        new_schedule = CircuitSchedule.from_dict(data)
668 1
        new_schedule.id = found_schedule.id
669
        # Remove the old schedule
670 1
        evc.circuit_scheduler.remove(found_schedule)
671
        # Append the modified schedule
672 1
        evc.circuit_scheduler.append(new_schedule)
673
674
        # Cancel all schedule jobs
675 1
        self.sched.cancel_job(found_schedule.id)
676
        # Add the new circuit schedule
677 1
        self.sched.add_circuit_job(evc, new_schedule)
678
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = new_schedule.as_dict()
682 1
        status = 200
683
684 1
        log.debug("update_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
688 1
    def delete_schedule(self, request: Request) -> JSONResponse:
689
        """Remove a circuit schedule.
690
691
        Remove the Schedule from EVC.
692
        Remove the Schedule from cron job.
693
        Save the EVC to the Storehouse.
694
        """
695 1
        schedule_id = request.path_params["schedule_id"]
696 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
697 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
698
699
        # Can not modify circuits deleted and archived
700 1
        if not found_schedule:
701 1
            result = f"schedule_id {schedule_id} not found"
702 1
            log.debug("delete_schedule result %s %s", result, 404)
703 1
            raise HTTPException(404, detail=result)
704
705 1
        if evc.archived:
706 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
707 1
            log.debug("delete_schedule result %s %s", result, 409)
708 1
            raise HTTPException(409, detail=result)
709
710
        # Remove the old schedule
711 1
        evc.circuit_scheduler.remove(found_schedule)
712
713
        # Cancel all schedule jobs
714 1
        self.sched.cancel_job(found_schedule.id)
715
        # Save EVC to mongodb
716 1
        evc.sync()
717
718 1
        result = "Schedule removed"
719 1
        status = 200
720
721 1
        log.debug("delete_schedule result %s %s", result, status)
722 1
        return JSONResponse(result, status_code=status)
723
724 1
    def _check_no_tag_duplication(
725
        self,
726
        evc_id: str,
727
        uni_a: Optional[UNI] = None,
728
        uni_z: Optional[UNI] = None
729
    ):
730
        """Check if the given EVC has UNIs with no tag and if these are
731
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
732
        Args:
733
            evc (dict): EVC to be analyzed.
734
        """
735
736
        # No UNIs
737 1
        if not (uni_a or uni_z):
738 1
            return
739
740 1
        if (not (uni_a and not uni_a.user_tag) and
741
                not (uni_z and not uni_z.user_tag)):
742 1
            return
743 1
        for circuit in self.circuits.copy().values():
744 1
            if (not circuit.archived and circuit._id != evc_id):
745 1
                if uni_a and uni_a.user_tag is None:
746 1
                    circuit.check_no_tag_duplicate(uni_a)
747 1
                if uni_z and uni_z.user_tag is None:
748 1
                    circuit.check_no_tag_duplicate(uni_z)
749
750 1
    @listen_to("kytos/topology.link_up")
751 1
    def on_link_up(self, event):
752
        """Change circuit when link is up or end_maintenance."""
753
        self.handle_link_up(event)
754
755 1
    def handle_link_up(self, event):
756
        """Change circuit when link is up or end_maintenance."""
757 1
        log.info("Event handle_link_up %s", event.content["link"])
758 1
        for evc in self.get_evcs_by_svc_level():
759 1
            if evc.is_enabled() and not evc.archived:
760 1
                with evc.lock:
761 1
                    evc.handle_link_up(event.content["link"])
762
763
    # Possibly replace this with interruptions?
764 1
    @listen_to(
765
        '.*.switch.interface.(link_up|link_down|created|deleted)'
766
    )
767 1
    def on_interface_link_change(self, event: KytosEvent):
768
        """
769
        Handler for interface link_up and link_down events.
770
        """
771
        self.handle_on_interface_link_change(event)
772
773 1
    def handle_on_interface_link_change(self, event: KytosEvent):
774
        """
775
        Handler to sort interface events {link_(up, down), create, deleted}
776
777
        To avoid multiple database updated (link flap):
778
        Every interface is identfied and processed in parallel.
779
        Once an interface event is received a time is started.
780
        While time is running self._intf_events will be updated.
781
        After time has passed last received event will be processed.
782
        """
783 1
        iface = event.content.get("interface")
784 1
        with self._lock_interfaces[iface.id]:
785 1
            _now = event.timestamp
786
            # Return out of order events
787 1
            if (
788
                iface.id in self._intf_events
789
                and self._intf_events[iface.id]["event"].timestamp > _now
790
            ):
791 1
                return
792 1
            self._intf_events[iface.id].update({"event": event})
793 1
            if "last_acquired" in self._intf_events[iface.id]:
794 1
                return
795 1
            self._intf_events[iface.id].update({"last_acquired": now()})
796 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
797 1
        with self._lock_interfaces[iface.id]:
798 1
            event = self._intf_events[iface.id]["event"]
799 1
            self._intf_events[iface.id].pop('last_acquired', None)
800 1
            _, _, event_type = event.name.rpartition('.')
801 1
            if event_type in ('link_up', 'created'):
802 1
                self.handle_interface_link_up(iface)
803 1
            elif event_type in ('link_down', 'deleted'):
804 1
                self.handle_interface_link_down(iface)
805
806 1
    def handle_interface_link_up(self, interface):
807
        """
808
        Handler for interface link_up events
809
        """
810
        for evc in self.get_evcs_by_svc_level():
811
            log.info("Event handle_interface_link_up %s", interface)
812
            evc.handle_interface_link_up(
813
                interface
814
            )
815
816 1
    def handle_interface_link_down(self, interface):
817
        """
818
        Handler for interface link_down events
819
        """
820
        for evc in self.get_evcs_by_svc_level():
821
            log.info("Event handle_interface_link_down %s", interface)
822
            evc.handle_interface_link_down(
823
                interface
824
            )
825
826 1
    @listen_to("kytos/topology.link_down")
827 1
    def on_link_down(self, event):
828
        """Change circuit when link is down or under_mantenance."""
829
        self.handle_link_down(event)
830
831 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
832
        """Change circuit when link is down or under_mantenance."""
833 1
        link = event.content["link"]
834 1
        log.info("Event handle_link_down %s", link)
835 1
        switch_flows = {}
836 1
        evcs_with_failover = []
837 1
        evcs_normal = []
838 1
        check_failover = []
839 1
        for evc in self.get_evcs_by_svc_level():
840 1
            if evc.is_affected_by_link(link):
841
                # if there is no failover path, handles link down the
842
                # tradditional way
843 1
                if (
844
                    not getattr(evc, 'failover_path', None) or
845
                    evc.is_failover_path_affected_by_link(link)
846
                ):
847 1
                    evcs_normal.append(evc)
848 1
                    continue
849 1
                try:
850 1
                    dpid_flows = evc.get_failover_flows()
851
                # pylint: disable=broad-except
852 1
                except Exception:
853 1
                    err = traceback.format_exc().replace("\n", ", ")
854 1
                    log.error(
855
                        f"Ignore Failover path for {evc} due to error: {err}"
856
                    )
857 1
                    evcs_normal.append(evc)
858 1
                    continue
859 1
                for dpid, flows in dpid_flows.items():
860 1
                    switch_flows.setdefault(dpid, [])
861 1
                    switch_flows[dpid].extend(flows)
862 1
                evcs_with_failover.append(evc)
863
            else:
864 1
                check_failover.append(evc)
865
866 1
        while switch_flows:
867 1
            offset = settings.BATCH_SIZE or None
868 1
            switches = list(switch_flows.keys())
869 1
            for dpid in switches:
870 1
                emit_event(
871
                    self.controller,
872
                    context="kytos.flow_manager",
873
                    name="flows.install",
874
                    content={
875
                        "dpid": dpid,
876
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
877
                    }
878
                )
879 1
                if offset is None or offset >= len(switch_flows[dpid]):
880 1
                    del switch_flows[dpid]
881 1
                    continue
882 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
883 1
            time.sleep(settings.BATCH_INTERVAL)
884
885 1
        for evc in evcs_with_failover:
886 1
            with evc.lock:
887 1
                old_path = evc.current_path
888 1
                evc.current_path = evc.failover_path
889 1
                evc.failover_path = old_path
890 1
                evc.sync()
891 1
            emit_event(self.controller, "redeployed_link_down",
892
                       content=map_evc_event_content(evc))
893 1
            log.info(
894
                f"{evc} redeployed with failover due to link down {link.id}"
895
            )
896
897 1
        for evc in evcs_normal:
898 1
            emit_event(
899
                self.controller,
900
                "evc_affected_by_link_down",
901
                content={"link_id": link.id} | map_evc_event_content(evc)
902
            )
903
904
        # After handling the hot path, check if new failover paths are needed.
905
        # Note that EVCs affected by link down will generate a KytosEvent for
906
        # deployed|redeployed, which will trigger the failover path setup.
907
        # Thus, we just need to further check the check_failover list
908 1
        for evc in check_failover:
909 1
            if evc.is_failover_path_affected_by_link(link):
910 1
                with evc.lock:
911 1
                    evc.setup_failover_path()
912
913 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
914 1
    def on_evc_affected_by_link_down(self, event):
915
        """Change circuit when link is down or under_mantenance."""
916
        self.handle_evc_affected_by_link_down(event)
917
918 1
    def handle_evc_affected_by_link_down(self, event):
919
        """Change circuit when link is down or under_mantenance."""
920 1
        evc = self.circuits.get(event.content["evc_id"])
921 1
        link_id = event.content['link_id']
922 1
        if not evc:
923 1
            return
924 1
        with evc.lock:
925 1
            result = evc.handle_link_down()
926 1
        event_name = "error_redeploy_link_down"
927 1
        if result:
928 1
            log.info(f"{evc} redeployed due to link down {link_id}")
929 1
            event_name = "redeployed_link_down"
930 1
        emit_event(self.controller, event_name,
931
                   content=map_evc_event_content(evc))
932
933 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
934 1
    def on_evc_deployed(self, event):
935
        """Handle EVC deployed|redeployed_link_down."""
936
        self.handle_evc_deployed(event)
937
938 1
    def handle_evc_deployed(self, event):
939
        """Setup failover path on evc deployed."""
940 1
        evc = self.circuits.get(event.content["evc_id"])
941 1
        if not evc:
942 1
            return
943 1
        with evc.lock:
944 1
            evc.setup_failover_path()
945
946 1
    @listen_to("kytos/topology.topology_loaded")
947 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
948
        """Load EVCs once the topology is available."""
949
        self.load_all_evcs()
950
951 1
    def load_all_evcs(self):
952
        """Try to load all EVCs on startup."""
953 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
954 1
        for circuit_id, circuit in circuits:
955 1
            if circuit_id not in self.circuits:
956 1
                self._load_evc(circuit)
957 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
958
                   timeout=1)
959
960 1
    def _load_evc(self, circuit_dict):
961
        """Load one EVC from mongodb to memory."""
962 1
        try:
963 1
            evc = self._evc_from_dict(circuit_dict)
964 1
        except (ValueError, KytosTagError) as exception:
965 1
            log.error(
966
                f"Could not load EVC: dict={circuit_dict} error={exception}"
967
            )
968 1
            return None
969 1
        if evc.archived:
970 1
            return None
971
972 1
        self.circuits.setdefault(evc.id, evc)
973 1
        self.sched.add(evc)
974 1
        return evc
975
976 1
    @listen_to("kytos/flow_manager.flow.error")
977 1
    def on_flow_mod_error(self, event):
978
        """Handle flow mod errors related to an EVC."""
979
        self.handle_flow_mod_error(event)
980
981 1
    def handle_flow_mod_error(self, event):
982
        """Handle flow mod errors related to an EVC."""
983 1
        flow = event.content["flow"]
984 1
        command = event.content.get("error_command")
985 1
        if command != "add":
986
            return
987 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
988 1
        if evc:
989 1
            with evc.lock:
990 1
                evc.remove_current_flows()
991
992 1
    def _evc_dict_with_instances(self, evc_dict):
993
        """Convert some dict values to instance of EVC classes.
994
995
        This method will convert: [UNI, Link]
996
        """
997 1
        data = evc_dict.copy()  # Do not modify the original dict
998 1
        for attribute, value in data.items():
999
            # Get multiple attributes.
1000
            # Ex: uni_a, uni_z
1001 1
            if "uni" in attribute:
1002 1
                try:
1003 1
                    data[attribute] = self._uni_from_dict(value)
1004 1
                except ValueError as exception:
1005 1
                    result = "Error creating UNI: Invalid value"
1006 1
                    raise ValueError(result) from exception
1007
1008 1
            if attribute == "circuit_scheduler":
1009 1
                data[attribute] = []
1010 1
                for schedule in value:
1011 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1012
1013
            # Get multiple attributes.
1014
            # Ex: primary_links,
1015
            #     backup_links,
1016
            #     current_links_cache,
1017
            #     primary_links_cache,
1018
            #     backup_links_cache
1019 1
            if "links" in attribute:
1020 1
                data[attribute] = [
1021
                    self._link_from_dict(link) for link in value
1022
                ]
1023
1024
            # Ex: current_path,
1025
            #     primary_path,
1026
            #     backup_path
1027 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1028 1
                data[attribute] = Path(
1029
                    [self._link_from_dict(link) for link in value]
1030
                )
1031
1032 1
        return data
1033
1034 1
    def _evc_from_dict(self, evc_dict):
1035 1
        data = self._evc_dict_with_instances(evc_dict)
1036 1
        data["table_group"] = self.table_group
1037 1
        return EVC(self.controller, **data)
1038
1039 1
    def _uni_from_dict(self, uni_dict):
1040
        """Return a UNI object from python dict."""
1041 1
        if uni_dict is None:
1042 1
            return False
1043
1044 1
        interface_id = uni_dict.get("interface_id")
1045 1
        interface = self.controller.get_interface_by_id(interface_id)
1046 1
        if interface is None:
1047 1
            result = (
1048
                "Error creating UNI:"
1049
                + f"Could not instantiate interface {interface_id}"
1050
            )
1051 1
            raise ValueError(result) from ValueError
1052 1
        tag_convert = {1: "vlan"}
1053 1
        tag_dict = uni_dict.get("tag", None)
1054 1
        if tag_dict:
1055 1
            tag_type = tag_dict.get("tag_type")
1056 1
            tag_type = tag_convert.get(tag_type, tag_type)
1057 1
            tag_value = tag_dict.get("value")
1058 1
            if isinstance(tag_value, list):
1059 1
                tag_value = get_tag_ranges(tag_value)
1060 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1061 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1062
            else:
1063 1
                tag = TAG(tag_type, tag_value)
1064
        else:
1065 1
            tag = None
1066 1
        uni = UNI(interface, tag)
1067 1
        return uni
1068
1069 1
    def _link_from_dict(self, link_dict):
1070
        """Return a Link object from python dict."""
1071 1
        id_a = link_dict.get("endpoint_a").get("id")
1072 1
        id_b = link_dict.get("endpoint_b").get("id")
1073
1074 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1075 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1076 1
        if not endpoint_a:
1077 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1078 1
            raise ValueError(error_msg)
1079 1
        if not endpoint_b:
1080
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1081
            raise ValueError(error_msg)
1082
1083 1
        link = Link(endpoint_a, endpoint_b)
1084 1
        if "metadata" in link_dict:
1085 1
            link.extend_metadata(link_dict.get("metadata"))
1086
1087 1
        s_vlan = link.get_metadata("s_vlan")
1088 1
        if s_vlan:
1089 1
            tag = TAG.from_dict(s_vlan)
1090 1
            if tag is False:
1091
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1092
                raise ValueError(error_msg)
1093 1
            link.update_metadata("s_vlan", tag)
1094 1
        return link
1095
1096 1
    def _find_evc_by_schedule_id(self, schedule_id):
1097
        """
1098
        Find an EVC and CircuitSchedule based on schedule_id.
1099
1100
        :param schedule_id: Schedule ID
1101
        :return: EVC and Schedule
1102
        """
1103 1
        circuits = self._get_circuits_buffer()
1104 1
        found_schedule = None
1105 1
        evc = None
1106
1107
        # pylint: disable=unused-variable
1108 1
        for c_id, circuit in circuits.items():
1109 1
            for schedule in circuit.circuit_scheduler:
1110 1
                if schedule.id == schedule_id:
1111 1
                    found_schedule = schedule
1112 1
                    evc = circuit
1113 1
                    break
1114 1
            if found_schedule:
1115 1
                break
1116 1
        return evc, found_schedule
1117
1118 1
    def _get_circuits_buffer(self):
1119
        """
1120
        Return the circuit buffer.
1121
1122
        If the buffer is empty, try to load data from mongodb.
1123
        """
1124 1
        if not self.circuits:
1125
            # Load circuits from mongodb to buffer
1126 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1127 1
            for c_id, circuit in circuits.items():
1128 1
                evc = self._evc_from_dict(circuit)
1129 1
                self.circuits[c_id] = evc
1130 1
        return self.circuits
1131
1132
    # pylint: disable=attribute-defined-outside-init
1133 1
    @alisten_to("kytos/of_multi_table.enable_table")
1134 1
    async def on_table_enabled(self, event):
1135
        """Handle a recently table enabled."""
1136 1
        table_group = event.content.get("mef_eline", None)
1137 1
        if not table_group:
1138 1
            return
1139 1
        for group in table_group:
1140 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1141 1
                log.error(f'The table group "{group}" is not allowed for '
1142
                          f'mef_eline. Allowed table groups are '
1143
                          f'{settings.TABLE_GROUP_ALLOWED}')
1144 1
                return
1145 1
        self.table_group.update(table_group)
1146 1
        content = {"group_table": self.table_group}
1147 1
        name = "kytos/mef_eline.enable_table"
1148
        await aemit_event(self.controller, name, content)
1149