Passed
Pull Request — master (#424)
by
unknown
03:51
created

build.main.Main._uni_from_dict()   B

Complexity

Conditions 5

Size

Total Lines 29
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 24
nop 2
dl 0
loc 29
ccs 22
cts 22
cp 1
crap 5
rs 8.8373
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from datetime import timezone
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content)
35
36
37
# pylint: disable=too-many-public-methods
38 1
class Main(KytosNApp):
39
    """Main class of amlight/mef_eline NApp.
40
41
    This class is the entry point for this napp.
42
    """
43
44 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
45
46 1
    def setup(self):
47
        """Replace the '__init__' method for the KytosNApp subclass.
48
49
        The setup method is automatically called by the controller when your
50
        application is loaded.
51
52
        So, if you have any setup routine, insert it here.
53
        """
54
        # object used to scheduler circuit events
55 1
        self.sched = Scheduler()
56
57
        # object to save and load circuits
58 1
        self.mongo_controller = self.get_eline_controller()
59 1
        self.mongo_controller.bootstrap_indexes()
60
61
        # set the controller that will manager the dynamic paths
62 1
        DynamicPathManager.set_controller(self.controller)
63
64
        # dictionary of EVCs created. It acts as a circuit buffer.
65
        # Every create/update/delete must be synced to mongodb.
66 1
        self.circuits = {}
67
68 1
        self.intf_events = defaultdict(dict)
69 1
        self.lock_interfaces = defaultdict(Lock)
70 1
        self.table_group = {"epl": 0, "evpl": 0}
71 1
        self._lock = Lock()
72 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
73
74 1
        self.load_all_evcs()
75 1
        self._topology_updated_at = None
76
77 1
    def get_evcs_by_svc_level(self) -> list:
78
        """Get circuits sorted by desc service level and asc creation_time.
79
80
        In the future, as more ops are offloaded it should be get from the DB.
81
        """
82 1
        return sorted(self.circuits.values(),
83
                      key=lambda x: (-x.service_level, x.creation_time))
84
85 1
    @staticmethod
86 1
    def get_eline_controller():
87
        """Return the ELineController instance."""
88
        return controllers.ELineController()
89
90 1
    def execute(self):
91
        """Execute once when the napp is running."""
92 1
        if self._lock.locked():
93 1
            return
94 1
        log.debug("Starting consistency routine")
95 1
        with self._lock:
96 1
            self.execute_consistency()
97 1
        log.debug("Finished consistency routine")
98
99 1
    def should_be_checked(self, circuit):
100
        "Verify if the circuit meets the necessary conditions to be checked"
101
        # pylint: disable=too-many-boolean-expressions
102 1
        if (
103
                circuit.is_enabled()
104
                and not circuit.is_active()
105
                and not circuit.lock.locked()
106
                and not circuit.has_recent_removed_flow()
107
                and not circuit.is_recent_updated()
108
                and circuit.are_unis_active(self.controller.switches)
109
                # if a inter-switch EVC does not have current_path, it does not
110
                # make sense to run sdntrace on it
111
                and (circuit.is_intra_switch() or circuit.current_path)
112
                ):
113 1
            return True
114
        return False
115
116 1
    def execute_consistency(self):
117
        """Execute consistency routine."""
118 1
        circuits_to_check = []
119 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
120 1
        for circuit in self.get_evcs_by_svc_level():
121 1
            stored_circuits.pop(circuit.id, None)
122 1
            if self.should_be_checked(circuit):
123 1
                circuits_to_check.append(circuit)
124 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
125 1
        for circuit in circuits_to_check:
126 1
            is_checked = circuits_checked.get(circuit.id)
127 1
            if is_checked:
128 1
                circuit.execution_rounds = 0
129 1
                log.info(f"{circuit} enabled but inactive - activating")
130 1
                with circuit.lock:
131 1
                    circuit.activate()
132 1
                    circuit.sync()
133
            else:
134 1
                circuit.execution_rounds += 1
135 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
136 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
137 1
                    with circuit.lock:
138 1
                        circuit.deploy()
139 1
        for circuit_id in stored_circuits:
140 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
141 1
            self._load_evc(stored_circuits[circuit_id])
142
143 1
    def shutdown(self):
144
        """Execute when your napp is unloaded.
145
146
        If you have some cleanup procedure, insert it here.
147
        """
148
149 1
    @rest("/v2/evc/", methods=["GET"])
150 1
    def list_circuits(self, request: Request) -> JSONResponse:
151
        """Endpoint to return circuits stored.
152
153
        archive query arg if defined (not null) will be filtered
154
        accordingly, by default only non archived evcs will be listed
155
        """
156 1
        log.debug("list_circuits /v2/evc")
157 1
        args = request.query_params
158 1
        archived = args.get("archived", "false").lower()
159 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
160 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
161
                                                      metadata=args)
162 1
        circuits = circuits['circuits']
163 1
        return JSONResponse(circuits)
164
165 1
    @rest("/v2/evc/schedule", methods=["GET"])
166 1
    def list_schedules(self, _request: Request) -> JSONResponse:
167
        """Endpoint to return all schedules stored for all circuits.
168
169
        Return a JSON with the following template:
170
        [{"schedule_id": <schedule_id>,
171
         "circuit_id": <circuit_id>,
172
         "schedule": <schedule object>}]
173
        """
174 1
        log.debug("list_schedules /v2/evc/schedule")
175 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
176 1
        if not circuits:
177 1
            result = {}
178 1
            status = 200
179 1
            return JSONResponse(result, status_code=status)
180
181 1
        result = []
182 1
        status = 200
183 1
        for circuit in circuits:
184 1
            circuit_scheduler = circuit.get("circuit_scheduler")
185 1
            if circuit_scheduler:
186 1
                for scheduler in circuit_scheduler:
187 1
                    value = {
188
                        "schedule_id": scheduler.get("id"),
189
                        "circuit_id": circuit.get("id"),
190
                        "schedule": scheduler,
191
                    }
192 1
                    result.append(value)
193
194 1
        log.debug("list_schedules result %s %s", result, status)
195 1
        return JSONResponse(result, status_code=status)
196
197 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
198 1
    def get_circuit(self, request: Request) -> JSONResponse:
199
        """Endpoint to return a circuit based on id."""
200 1
        circuit_id = request.path_params["circuit_id"]
201 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
202 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
203 1
        if not circuit:
204 1
            result = f"circuit_id {circuit_id} not found"
205 1
            log.debug("get_circuit result %s %s", result, 404)
206 1
            raise HTTPException(404, detail=result)
207 1
        status = 200
208 1
        log.debug("get_circuit result %s %s", circuit, status)
209 1
        return JSONResponse(circuit, status_code=status)
210
211
    # pylint: disable=too-many-branches, too-many-statements
212 1
    @rest("/v2/evc/", methods=["POST"])
213 1
    @validate_openapi(spec)
214 1
    def create_circuit(self, request: Request) -> JSONResponse:
215
        """Try to create a new circuit.
216
217
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
218
        UNI_Z's requested C-VID are available from the interfaces' pools. This
219
        is checked when creating the UNI object.
220
221
        Then, E-Line NApp requests a primary and a backup path to the
222
        Pathfinder NApp using the attributes primary_links and backup_links
223
        submitted via REST
224
225
        # For each link composing paths in #3:
226
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
227
        #  - Using the S-VID obtained, generate abstract flow entries to be
228
        #    sent to FlowManager
229
230
        Push abstract flow entries to FlowManager and FlowManager pushes
231
        OpenFlow entries to datapaths
232
233
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
234
        creation
235
236
        Finnaly, notify user of the status of its request.
237
        """
238
        # Try to create the circuit object
239 1
        log.debug("create_circuit /v2/evc/")
240 1
        data = get_json_or_400(request, self.controller.loop)
241
242 1
        try:
243 1
            evc = self._evc_from_dict(data)
244 1
        except (ValueError, KytosTagError) as exception:
245 1
            log.debug("create_circuit result %s %s", exception, 400)
246 1
            raise HTTPException(400, detail=str(exception)) from exception
247 1
        try:
248 1
            check_disabled_component(evc.uni_a, evc.uni_z)
249 1
        except DisabledSwitch as exception:
250 1
            log.debug("create_circuit result %s %s", exception, 409)
251 1
            raise HTTPException(
252
                    409,
253
                    detail=f"Path is not valid: {exception}"
254
                ) from exception
255
256 1
        if evc.primary_path:
257 1
            try:
258 1
                evc.primary_path.is_valid(
259
                    evc.uni_a.interface.switch,
260
                    evc.uni_z.interface.switch,
261
                    bool(evc.circuit_scheduler),
262
                )
263 1
            except InvalidPath as exception:
264 1
                raise HTTPException(
265
                    400,
266
                    detail=f"primary_path is not valid: {exception}"
267
                ) from exception
268 1
        if evc.backup_path:
269 1
            try:
270 1
                evc.backup_path.is_valid(
271
                    evc.uni_a.interface.switch,
272
                    evc.uni_z.interface.switch,
273
                    bool(evc.circuit_scheduler),
274
                )
275 1
            except InvalidPath as exception:
276 1
                raise HTTPException(
277
                    400,
278
                    detail=f"backup_path is not valid: {exception}"
279
                ) from exception
280
281 1
        if not evc._tag_lists_equal():
282 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
283 1
            raise HTTPException(400, detail=detail)
284
285 1
        try:
286 1
            evc._validate_has_primary_or_dynamic()
287 1
        except ValueError as exception:
288 1
            raise HTTPException(400, detail=str(exception)) from exception
289
290 1
        try:
291 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
292
        except DuplicatedNoTagUNI as exception:
293
            log.debug("create_circuit result %s %s", exception, 409)
294
            raise HTTPException(409, detail=str(exception)) from exception
295
296 1
        try:
297 1
            self._use_uni_tags(evc)
298 1
        except KytosTagError as exception:
299 1
            raise HTTPException(400, detail=str(exception)) from exception
300
301
        # save circuit
302 1
        try:
303 1
            evc.sync()
304
        except ValidationError as exception:
305
            raise HTTPException(400, detail=str(exception)) from exception
306
307
        # store circuit in dictionary
308 1
        self.circuits[evc.id] = evc
309
310
        # Schedule the circuit deploy
311 1
        self.sched.add(evc)
312
313
        # Circuit has no schedule, deploy now
314 1
        if not evc.circuit_scheduler:
315 1
            with evc.lock:
316 1
                evc.deploy()
317
318
        # Notify users
319 1
        result = {"circuit_id": evc.id}
320 1
        status = 201
321 1
        log.debug("create_circuit result %s %s", result, status)
322 1
        emit_event(self.controller, name="created",
323
                   content=map_evc_event_content(evc))
324 1
        return JSONResponse(result, status_code=status)
325
326 1
    @staticmethod
327 1
    def _use_uni_tags(evc):
328 1
        uni_a = evc.uni_a
329 1
        evc._use_uni_vlan(uni_a)
330 1
        try:
331 1
            uni_z = evc.uni_z
332 1
            evc._use_uni_vlan(uni_z)
333 1
        except KytosTagError as err:
334 1
            evc.make_uni_vlan_available(uni_a)
335 1
            raise err
336
337 1
    @listen_to('kytos/flow_manager.flow.removed')
338 1
    def on_flow_delete(self, event):
339
        """Capture delete messages to keep track when flows got removed."""
340
        self.handle_flow_delete(event)
341
342 1
    def handle_flow_delete(self, event):
343
        """Keep track when the EVC got flows removed by deriving its cookie."""
344 1
        flow = event.content["flow"]
345 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
346 1
        if evc:
347 1
            log.debug("Flow removed in EVC %s", evc.id)
348 1
            evc.set_flow_removed_at()
349
350 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
351 1
    @validate_openapi(spec)
352 1
    def update(self, request: Request) -> JSONResponse:
353
        """Update a circuit based on payload.
354
355
        The EVC attributes (creation_time, active, current_path,
356
        failover_path, _id, archived) can't be updated.
357
        """
358 1
        data = get_json_or_400(request, self.controller.loop)
359 1
        circuit_id = request.path_params["circuit_id"]
360 1
        log.debug("update /v2/evc/%s", circuit_id)
361 1
        try:
362 1
            evc = self.circuits[circuit_id]
363 1
        except KeyError:
364 1
            result = f"circuit_id {circuit_id} not found"
365 1
            log.debug("update result %s %s", result, 404)
366 1
            raise HTTPException(404, detail=result) from KeyError
367
368 1
        if evc.archived:
369 1
            result = "Can't update archived EVC"
370 1
            log.debug("update result %s %s", result, 409)
371 1
            raise HTTPException(409, detail=result)
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    evc.deploy()
409 1
        result = {evc.id: evc.as_dict()}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits[circuit_id]
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432
433 1
        if evc.archived:
434 1
            result = f"Circuit {circuit_id} already removed"
435 1
            log.debug("delete_circuit result %s %s", result, 404)
436 1
            raise HTTPException(404, detail=result)
437
438 1
        log.info("Removing %s", evc)
439 1
        with evc.lock:
440 1
            evc.remove_current_flows()
441 1
            evc.remove_failover_flows(sync=False)
442 1
            evc.deactivate()
443 1
            evc.disable()
444 1
            self.sched.remove(evc)
445 1
            evc.archive()
446 1
            evc.remove_uni_tags()
447 1
            evc.sync()
448 1
        log.info("EVC removed. %s", evc)
449 1
        result = {"response": f"Circuit {circuit_id} removed"}
450 1
        status = 200
451
452 1
        log.debug("delete_circuit result %s %s", result, status)
453 1
        emit_event(self.controller, "deleted",
454
                   content=map_evc_event_content(evc))
455 1
        return JSONResponse(result, status_code=status)
456
457 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
458 1
    def get_metadata(self, request: Request) -> JSONResponse:
459
        """Get metadata from an EVC."""
460 1
        circuit_id = request.path_params["circuit_id"]
461 1
        try:
462 1
            return (
463
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
464
            )
465
        except KeyError as error:
466
            raise HTTPException(
467
                404,
468
                detail=f"circuit_id {circuit_id} not found."
469
            ) from error
470
471 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
472 1
    @validate_openapi(spec)
473 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
474
        """Add metadata to a bulk of EVCs."""
475 1
        data = get_json_or_400(request, self.controller.loop)
476 1
        circuit_ids = data.pop("circuit_ids")
477
478 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
479
480 1
        fail_evcs = []
481 1
        for _id in circuit_ids:
482 1
            try:
483 1
                evc = self.circuits[_id]
484 1
                evc.extend_metadata(data)
485 1
            except KeyError:
486 1
                fail_evcs.append(_id)
487
488 1
        if fail_evcs:
489 1
            raise HTTPException(404, detail=fail_evcs)
490 1
        return JSONResponse("Operation successful", status_code=201)
491
492 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
493 1
    @validate_openapi(spec)
494 1
    def add_metadata(self, request: Request) -> JSONResponse:
495
        """Add metadata to an EVC."""
496 1
        circuit_id = request.path_params["circuit_id"]
497 1
        metadata = get_json_or_400(request, self.controller.loop)
498 1
        if not isinstance(metadata, dict):
499
            raise HTTPException(400, "Invalid metadata value: {metadata}")
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError as error:
503 1
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found."
506
            ) from error
507
508 1
        evc.extend_metadata(metadata)
509 1
        evc.sync()
510 1
        return JSONResponse("Operation successful", status_code=201)
511
512 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
513 1
    @validate_openapi(spec)
514 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
515
        """Delete metada from a bulk of EVCs"""
516 1
        data = get_json_or_400(request, self.controller.loop)
517 1
        key = request.path_params["key"]
518 1
        circuit_ids = data.pop("circuit_ids")
519 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
520
521 1
        fail_evcs = []
522 1
        for _id in circuit_ids:
523 1
            try:
524 1
                evc = self.circuits[_id]
525 1
                evc.remove_metadata(key)
526 1
            except KeyError:
527 1
                fail_evcs.append(_id)
528
529 1
        if fail_evcs:
530 1
            raise HTTPException(404, detail=fail_evcs)
531 1
        return JSONResponse("Operation successful")
532
533 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
534 1
    def delete_metadata(self, request: Request) -> JSONResponse:
535
        """Delete metadata from an EVC."""
536 1
        circuit_id = request.path_params["circuit_id"]
537 1
        key = request.path_params["key"]
538 1
        try:
539 1
            evc = self.circuits[circuit_id]
540 1
        except KeyError as error:
541 1
            raise HTTPException(
542
                404,
543
                detail=f"circuit_id {circuit_id} not found."
544
            ) from error
545
546 1
        evc.remove_metadata(key)
547 1
        evc.sync()
548 1
        return JSONResponse("Operation successful")
549
550 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
551 1
    def redeploy(self, request: Request) -> JSONResponse:
552
        """Endpoint to force the redeployment of an EVC."""
553 1
        circuit_id = request.path_params["circuit_id"]
554 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
555 1
        try:
556 1
            evc = self.circuits[circuit_id]
557 1
        except KeyError:
558 1
            raise HTTPException(
559
                404,
560
                detail=f"circuit_id {circuit_id} not found"
561
            ) from KeyError
562 1
        if evc.is_enabled():
563 1
            with evc.lock:
564 1
                evc.remove_current_flows()
565 1
                evc.deploy()
566 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
567 1
            status = 202
568
        else:
569 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
570 1
            status = 409
571
572 1
        return JSONResponse(result, status_code=status)
573
574 1
    @rest("/v2/evc/schedule/", methods=["POST"])
575 1
    @validate_openapi(spec)
576 1
    def create_schedule(self, request: Request) -> JSONResponse:
577
        """
578
        Create a new schedule for a given circuit.
579
580
        This service do no check if there are conflicts with another schedule.
581
        Payload example:
582
            {
583
              "circuit_id":"aa:bb:cc",
584
              "schedule": {
585
                "date": "2019-08-07T14:52:10.967Z",
586
                "interval": "string",
587
                "frequency": "1 * * * *",
588
                "action": "create"
589
              }
590
            }
591
        """
592 1
        log.debug("create_schedule /v2/evc/schedule/")
593 1
        data = get_json_or_400(request, self.controller.loop)
594 1
        circuit_id = data["circuit_id"]
595 1
        schedule_data = data["schedule"]
596
597
        # Get EVC from circuits buffer
598 1
        circuits = self._get_circuits_buffer()
599
600
        # get the circuit
601 1
        evc = circuits.get(circuit_id)
602
603
        # get the circuit
604 1
        if not evc:
605 1
            result = f"circuit_id {circuit_id} not found"
606 1
            log.debug("create_schedule result %s %s", result, 404)
607 1
            raise HTTPException(404, detail=result)
608
        # Can not modify circuits deleted and archived
609 1
        if evc.archived:
610 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
611 1
            log.debug("create_schedule result %s %s", result, 409)
612 1
            raise HTTPException(409, detail=result)
613
614
        # new schedule from dict
615 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
616
617
        # If there is no schedule, create the list
618 1
        if not evc.circuit_scheduler:
619 1
            evc.circuit_scheduler = []
620
621
        # Add the new schedule
622 1
        evc.circuit_scheduler.append(new_schedule)
623
624
        # Add schedule job
625 1
        self.sched.add_circuit_job(evc, new_schedule)
626
627
        # save circuit to mongodb
628 1
        evc.sync()
629
630 1
        result = new_schedule.as_dict()
631 1
        status = 201
632
633 1
        log.debug("create_schedule result %s %s", result, status)
634 1
        return JSONResponse(result, status_code=status)
635
636 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
637 1
    @validate_openapi(spec)
638 1
    def update_schedule(self, request: Request) -> JSONResponse:
639
        """Update a schedule.
640
641
        Change all attributes from the given schedule from a EVC circuit.
642
        The schedule ID is preserved as default.
643
        Payload example:
644
            {
645
              "date": "2019-08-07T14:52:10.967Z",
646
              "interval": "string",
647
              "frequency": "1 * * *",
648
              "action": "create"
649
            }
650
        """
651 1
        data = get_json_or_400(request, self.controller.loop)
652 1
        schedule_id = request.path_params["schedule_id"]
653 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
654
655
        # Try to find a circuit schedule
656 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
657
658
        # Can not modify circuits deleted and archived
659 1
        if not found_schedule:
660 1
            result = f"schedule_id {schedule_id} not found"
661 1
            log.debug("update_schedule result %s %s", result, 404)
662 1
            raise HTTPException(404, detail=result)
663 1
        if evc.archived:
664 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
665 1
            log.debug("update_schedule result %s %s", result, 409)
666 1
            raise HTTPException(409, detail=result)
667
668 1
        new_schedule = CircuitSchedule.from_dict(data)
669 1
        new_schedule.id = found_schedule.id
670
        # Remove the old schedule
671 1
        evc.circuit_scheduler.remove(found_schedule)
672
        # Append the modified schedule
673 1
        evc.circuit_scheduler.append(new_schedule)
674
675
        # Cancel all schedule jobs
676 1
        self.sched.cancel_job(found_schedule.id)
677
        # Add the new circuit schedule
678 1
        self.sched.add_circuit_job(evc, new_schedule)
679
        # Save EVC to mongodb
680 1
        evc.sync()
681
682 1
        result = new_schedule.as_dict()
683 1
        status = 200
684
685 1
        log.debug("update_schedule result %s %s", result, status)
686 1
        return JSONResponse(result, status_code=status)
687
688 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
689 1
    def delete_schedule(self, request: Request) -> JSONResponse:
690
        """Remove a circuit schedule.
691
692
        Remove the Schedule from EVC.
693
        Remove the Schedule from cron job.
694
        Save the EVC to the Storehouse.
695
        """
696 1
        schedule_id = request.path_params["schedule_id"]
697 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
698 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
699
700
        # Can not modify circuits deleted and archived
701 1
        if not found_schedule:
702 1
            result = f"schedule_id {schedule_id} not found"
703 1
            log.debug("delete_schedule result %s %s", result, 404)
704 1
            raise HTTPException(404, detail=result)
705
706 1
        if evc.archived:
707 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
708 1
            log.debug("delete_schedule result %s %s", result, 409)
709 1
            raise HTTPException(409, detail=result)
710
711
        # Remove the old schedule
712 1
        evc.circuit_scheduler.remove(found_schedule)
713
714
        # Cancel all schedule jobs
715 1
        self.sched.cancel_job(found_schedule.id)
716
        # Save EVC to mongodb
717 1
        evc.sync()
718
719 1
        result = "Schedule removed"
720 1
        status = 200
721
722 1
        log.debug("delete_schedule result %s %s", result, status)
723 1
        return JSONResponse(result, status_code=status)
724
725 1
    def _check_no_tag_duplication(
726
        self,
727
        evc_id: str,
728
        uni_a: Optional[UNI] = None,
729
        uni_z: Optional[UNI] = None
730
    ):
731
        """Check if the given EVC has UNIs with no tag and if these are
732
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
733
        Args:
734
            evc (dict): EVC to be analyzed.
735
        """
736
737
        # No UNIs
738 1
        if not (uni_a or uni_z):
739 1
            return
740
741 1
        if (not (uni_a and not uni_a.user_tag) and
742
                not (uni_z and not uni_z.user_tag)):
743 1
            return
744 1
        for circuit in self.circuits.copy().values():
745 1
            if (not circuit.archived and circuit._id != evc_id):
746 1
                if uni_a and uni_a.user_tag is None:
747 1
                    circuit.check_no_tag_duplicate(uni_a)
748 1
                if uni_z and uni_z.user_tag is None:
749 1
                    circuit.check_no_tag_duplicate(uni_z)
750
751 1
    @listen_to("kytos/topology.link_up")
752 1
    def on_link_up(self, event):
753
        """Change circuit when link is up or end_maintenance."""
754
        self.handle_link_up(event)
755
756 1
    def handle_link_up(self, event):
757
        """Change circuit when link is up or end_maintenance."""
758 1
        log.info("Event handle_link_up %s", event.content["link"])
759 1
        for evc in self.get_evcs_by_svc_level():
760 1
            if evc.is_enabled() and not evc.archived:
761 1
                with evc.lock:
762 1
                    evc.handle_link_up(event.content["link"])
763
764
    # Possibly replace this with interruptions?
765 1
    @listen_to(
766
        '.*.switch.interface.(link_up|link_down|created|deleted)'
767
    )
768 1
    def on_interface_link_change(self, event: KytosEvent):
769
        """
770
        Handler for interface link_up and link_down events.
771
772
        To avoid multiple database updated (link flap):
773
        First check the time when interface was last updated.
774
        Second check the time when the last interface event was received.
775
        Last event received always updates database.
776
        """
777
        iface = event.content.get("interface")
778
        self.intf_events[iface.id].update({"received": now()})
779
        with self.lock_interfaces[iface.id]:
780
            last_updated = self.intf_events[iface.id].get("updated")
781
            _now = now()
782
            if not last_updated:
783
                self.intf_events[iface.id].update({"updated": _now})
784
                self.handle_on_interface_link_change(event)
785
            elif int((_now - last_updated).total_seconds() * 1000) >= 100:
786
                last_updated = last_updated.replace(tzinfo=timezone.utc)
787
                self.intf_events[iface.id].update({"updated": _now})
788
                self.handle_on_interface_link_change(event)
789
790
        time.sleep(0.1)
791
        with self.lock_interfaces[iface.id]:
792
            last_received = self.intf_events[iface.id].get("received")
793
            _now = now()
794
            last_received = last_received.replace(tzinfo=timezone.utc)
795
            if int((_now - last_received).total_seconds() * 1000) < 100:
796
                return
797
            self.intf_events[iface.id].update({"updated": _now})
798
            self.handle_on_interface_link_change(event)
799
800 1
    def handle_on_interface_link_change(self, event: KytosEvent):
801
        """
802
        Handler to sort interface events {link_(up, down), create, deleted}
803
        """
804
        _, _, event_type = event.name.rpartition('.')
805
        iface = event.content.get("interface")
806
        if event_type in ('link_up', 'created'):
807
            self.handle_interface_link_up(iface)
808
        elif event_type in ('link_down', 'deleted'):
809
            self.handle_interface_link_down(iface)
810
811 1
    def handle_interface_link_up(self, interface):
812
        """
813
        Handler for interface link_up events
814
        """
815
        for evc in self.get_evcs_by_svc_level():
816
            log.info("Event handle_interface_link_up %s", interface)
817
            evc.handle_interface_link_up(
818
                interface
819
            )
820
821 1
    def handle_interface_link_down(self, interface):
822
        """
823
        Handler for interface link_down events
824
        """
825
        for evc in self.get_evcs_by_svc_level():
826
            log.info("Event handle_interface_link_down %s", interface)
827
            evc.handle_interface_link_down(
828
                interface
829
            )
830
831 1
    @listen_to("kytos/topology.link_down")
832 1
    def on_link_down(self, event):
833
        """Change circuit when link is down or under_mantenance."""
834
        self.handle_link_down(event)
835
836 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
837
        """Change circuit when link is down or under_mantenance."""
838 1
        link = event.content["link"]
839 1
        log.info("Event handle_link_down %s", link)
840 1
        switch_flows = {}
841 1
        evcs_with_failover = []
842 1
        evcs_normal = []
843 1
        check_failover = []
844 1
        for evc in self.get_evcs_by_svc_level():
845 1
            if evc.is_affected_by_link(link):
846
                # if there is no failover path, handles link down the
847
                # tradditional way
848 1
                if (
849
                    not getattr(evc, 'failover_path', None) or
850
                    evc.is_failover_path_affected_by_link(link)
851
                ):
852 1
                    evcs_normal.append(evc)
853 1
                    continue
854 1
                try:
855 1
                    dpid_flows = evc.get_failover_flows()
856
                # pylint: disable=broad-except
857 1
                except Exception:
858 1
                    err = traceback.format_exc().replace("\n", ", ")
859 1
                    log.error(
860
                        f"Ignore Failover path for {evc} due to error: {err}"
861
                    )
862 1
                    evcs_normal.append(evc)
863 1
                    continue
864 1
                for dpid, flows in dpid_flows.items():
865 1
                    switch_flows.setdefault(dpid, [])
866 1
                    switch_flows[dpid].extend(flows)
867 1
                evcs_with_failover.append(evc)
868
            else:
869 1
                check_failover.append(evc)
870
871 1
        while switch_flows:
872 1
            offset = settings.BATCH_SIZE or None
873 1
            switches = list(switch_flows.keys())
874 1
            for dpid in switches:
875 1
                emit_event(
876
                    self.controller,
877
                    context="kytos.flow_manager",
878
                    name="flows.install",
879
                    content={
880
                        "dpid": dpid,
881
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
882
                    }
883
                )
884 1
                if offset is None or offset >= len(switch_flows[dpid]):
885 1
                    del switch_flows[dpid]
886 1
                    continue
887 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
888 1
            time.sleep(settings.BATCH_INTERVAL)
889
890 1
        for evc in evcs_with_failover:
891 1
            with evc.lock:
892 1
                old_path = evc.current_path
893 1
                evc.current_path = evc.failover_path
894 1
                evc.failover_path = old_path
895 1
                evc.sync()
896 1
            emit_event(self.controller, "redeployed_link_down",
897
                       content=map_evc_event_content(evc))
898 1
            log.info(
899
                f"{evc} redeployed with failover due to link down {link.id}"
900
            )
901
902 1
        for evc in evcs_normal:
903 1
            emit_event(
904
                self.controller,
905
                "evc_affected_by_link_down",
906
                content={"link_id": link.id} | map_evc_event_content(evc)
907
            )
908
909
        # After handling the hot path, check if new failover paths are needed.
910
        # Note that EVCs affected by link down will generate a KytosEvent for
911
        # deployed|redeployed, which will trigger the failover path setup.
912
        # Thus, we just need to further check the check_failover list
913 1
        for evc in check_failover:
914 1
            if evc.is_failover_path_affected_by_link(link):
915 1
                with evc.lock:
916 1
                    evc.setup_failover_path()
917
918 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
919 1
    def on_evc_affected_by_link_down(self, event):
920
        """Change circuit when link is down or under_mantenance."""
921
        self.handle_evc_affected_by_link_down(event)
922
923 1
    def handle_evc_affected_by_link_down(self, event):
924
        """Change circuit when link is down or under_mantenance."""
925 1
        evc = self.circuits.get(event.content["evc_id"])
926 1
        link_id = event.content['link_id']
927 1
        if not evc:
928 1
            return
929 1
        with evc.lock:
930 1
            result = evc.handle_link_down()
931 1
        event_name = "error_redeploy_link_down"
932 1
        if result:
933 1
            log.info(f"{evc} redeployed due to link down {link_id}")
934 1
            event_name = "redeployed_link_down"
935 1
        emit_event(self.controller, event_name,
936
                   content=map_evc_event_content(evc))
937
938 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
939 1
    def on_evc_deployed(self, event):
940
        """Handle EVC deployed|redeployed_link_down."""
941
        self.handle_evc_deployed(event)
942
943 1
    def handle_evc_deployed(self, event):
944
        """Setup failover path on evc deployed."""
945 1
        evc = self.circuits.get(event.content["evc_id"])
946 1
        if not evc:
947 1
            return
948 1
        with evc.lock:
949 1
            evc.setup_failover_path()
950
951 1
    @listen_to("kytos/topology.topology_loaded")
952 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
953
        """Load EVCs once the topology is available."""
954
        self.load_all_evcs()
955
956 1
    def load_all_evcs(self):
957
        """Try to load all EVCs on startup."""
958 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
959 1
        for circuit_id, circuit in circuits:
960 1
            if circuit_id not in self.circuits:
961 1
                self._load_evc(circuit)
962 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
963
                   timeout=1)
964
965 1
    def _load_evc(self, circuit_dict):
966
        """Load one EVC from mongodb to memory."""
967 1
        try:
968 1
            evc = self._evc_from_dict(circuit_dict)
969 1
        except (ValueError, KytosTagError) as exception:
970 1
            log.error(
971
                f"Could not load EVC: dict={circuit_dict} error={exception}"
972
            )
973 1
            return None
974 1
        if evc.archived:
975 1
            return None
976
977 1
        self.circuits.setdefault(evc.id, evc)
978 1
        self.sched.add(evc)
979 1
        return evc
980
981 1
    @listen_to("kytos/flow_manager.flow.error")
982 1
    def on_flow_mod_error(self, event):
983
        """Handle flow mod errors related to an EVC."""
984
        self.handle_flow_mod_error(event)
985
986 1
    def handle_flow_mod_error(self, event):
987
        """Handle flow mod errors related to an EVC."""
988 1
        flow = event.content["flow"]
989 1
        command = event.content.get("error_command")
990 1
        if command != "add":
991
            return
992 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
993 1
        if evc:
994 1
            with evc.lock:
995 1
                evc.remove_current_flows()
996
997 1
    def _evc_dict_with_instances(self, evc_dict):
998
        """Convert some dict values to instance of EVC classes.
999
1000
        This method will convert: [UNI, Link]
1001
        """
1002 1
        data = evc_dict.copy()  # Do not modify the original dict
1003 1
        for attribute, value in data.items():
1004
            # Get multiple attributes.
1005
            # Ex: uni_a, uni_z
1006 1
            if "uni" in attribute:
1007 1
                try:
1008 1
                    data[attribute] = self._uni_from_dict(value)
1009 1
                except ValueError as exception:
1010 1
                    result = "Error creating UNI: Invalid value"
1011 1
                    raise ValueError(result) from exception
1012
1013 1
            if attribute == "circuit_scheduler":
1014 1
                data[attribute] = []
1015 1
                for schedule in value:
1016 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1017
1018
            # Get multiple attributes.
1019
            # Ex: primary_links,
1020
            #     backup_links,
1021
            #     current_links_cache,
1022
            #     primary_links_cache,
1023
            #     backup_links_cache
1024 1
            if "links" in attribute:
1025 1
                data[attribute] = [
1026
                    self._link_from_dict(link) for link in value
1027
                ]
1028
1029
            # Ex: current_path,
1030
            #     primary_path,
1031
            #     backup_path
1032 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1033 1
                data[attribute] = Path(
1034
                    [self._link_from_dict(link) for link in value]
1035
                )
1036
1037 1
        return data
1038
1039 1
    def _evc_from_dict(self, evc_dict):
1040 1
        data = self._evc_dict_with_instances(evc_dict)
1041 1
        data["table_group"] = self.table_group
1042 1
        return EVC(self.controller, **data)
1043
1044 1
    def _uni_from_dict(self, uni_dict):
1045
        """Return a UNI object from python dict."""
1046 1
        if uni_dict is None:
1047 1
            return False
1048
1049 1
        interface_id = uni_dict.get("interface_id")
1050 1
        interface = self.controller.get_interface_by_id(interface_id)
1051 1
        if interface is None:
1052 1
            result = (
1053
                "Error creating UNI:"
1054
                + f"Could not instantiate interface {interface_id}"
1055
            )
1056 1
            raise ValueError(result) from ValueError
1057 1
        tag_convert = {1: "vlan"}
1058 1
        tag_dict = uni_dict.get("tag", None)
1059 1
        if tag_dict:
1060 1
            tag_type = tag_dict.get("tag_type")
1061 1
            tag_type = tag_convert.get(tag_type, tag_type)
1062 1
            tag_value = tag_dict.get("value")
1063 1
            if isinstance(tag_value, list):
1064 1
                tag_value = get_tag_ranges(tag_value)
1065 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1066 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1067
            else:
1068 1
                tag = TAG(tag_type, tag_value)
1069
        else:
1070 1
            tag = None
1071 1
        uni = UNI(interface, tag)
1072 1
        return uni
1073
1074 1
    def _link_from_dict(self, link_dict):
1075
        """Return a Link object from python dict."""
1076 1
        id_a = link_dict.get("endpoint_a").get("id")
1077 1
        id_b = link_dict.get("endpoint_b").get("id")
1078
1079 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1080 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1081 1
        if not endpoint_a:
1082 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1083 1
            raise ValueError(error_msg)
1084 1
        if not endpoint_b:
1085
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1086
            raise ValueError(error_msg)
1087
1088 1
        link = Link(endpoint_a, endpoint_b)
1089 1
        if "metadata" in link_dict:
1090 1
            link.extend_metadata(link_dict.get("metadata"))
1091
1092 1
        s_vlan = link.get_metadata("s_vlan")
1093 1
        if s_vlan:
1094 1
            tag = TAG.from_dict(s_vlan)
1095 1
            if tag is False:
1096
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1097
                raise ValueError(error_msg)
1098 1
            link.update_metadata("s_vlan", tag)
1099 1
        return link
1100
1101 1
    def _find_evc_by_schedule_id(self, schedule_id):
1102
        """
1103
        Find an EVC and CircuitSchedule based on schedule_id.
1104
1105
        :param schedule_id: Schedule ID
1106
        :return: EVC and Schedule
1107
        """
1108 1
        circuits = self._get_circuits_buffer()
1109 1
        found_schedule = None
1110 1
        evc = None
1111
1112
        # pylint: disable=unused-variable
1113 1
        for c_id, circuit in circuits.items():
1114 1
            for schedule in circuit.circuit_scheduler:
1115 1
                if schedule.id == schedule_id:
1116 1
                    found_schedule = schedule
1117 1
                    evc = circuit
1118 1
                    break
1119 1
            if found_schedule:
1120 1
                break
1121 1
        return evc, found_schedule
1122
1123 1
    def _get_circuits_buffer(self):
1124
        """
1125
        Return the circuit buffer.
1126
1127
        If the buffer is empty, try to load data from mongodb.
1128
        """
1129 1
        if not self.circuits:
1130
            # Load circuits from mongodb to buffer
1131 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1132 1
            for c_id, circuit in circuits.items():
1133 1
                evc = self._evc_from_dict(circuit)
1134 1
                self.circuits[c_id] = evc
1135 1
        return self.circuits
1136
1137
    # pylint: disable=attribute-defined-outside-init
1138 1
    @alisten_to("kytos/of_multi_table.enable_table")
1139 1
    async def on_table_enabled(self, event):
1140
        """Handle a recently table enabled."""
1141 1
        table_group = event.content.get("mef_eline", None)
1142 1
        if not table_group:
1143 1
            return
1144 1
        for group in table_group:
1145 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1146 1
                log.error(f'The table group "{group}" is not allowed for '
1147
                          f'mef_eline. Allowed table groups are '
1148
                          f'{settings.TABLE_GROUP_ALLOWED}')
1149 1
                return
1150 1
        self.table_group.update(table_group)
1151 1
        content = {"group_table": self.table_group}
1152 1
        name = "kytos/mef_eline.enable_table"
1153
        await aemit_event(self.controller, name, content)
1154