Passed
Pull Request — master (#491)
by Aldo
05:09
created

build.main.Main.load_all_evcs()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts,
35
                                         send_flow_mods_event)
36
37
38
# pylint: disable=too-many-public-methods
39 1
class Main(KytosNApp):
40
    """Main class of amlight/mef_eline NApp.
41
42
    This class is the entry point for this napp.
43
    """
44
45 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
46
47 1
    def setup(self):
48
        """Replace the '__init__' method for the KytosNApp subclass.
49
50
        The setup method is automatically called by the controller when your
51
        application is loaded.
52
53
        So, if you have any setup routine, insert it here.
54
        """
55
        # object used to scheduler circuit events
56 1
        self.sched = Scheduler()
57
58
        # object to save and load circuits
59 1
        self.mongo_controller = self.get_eline_controller()
60 1
        self.mongo_controller.bootstrap_indexes()
61
62
        # set the controller that will manager the dynamic paths
63 1
        DynamicPathManager.set_controller(self.controller)
64
65
        # dictionary of EVCs created. It acts as a circuit buffer.
66
        # Every create/update/delete must be synced to mongodb.
67 1
        self.circuits = {}
68
69 1
        self._intf_events = defaultdict(dict)
70 1
        self._lock_interfaces = defaultdict(Lock)
71 1
        self.table_group = {"epl": 0, "evpl": 0}
72 1
        self._lock = Lock()
73 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
74
75 1
        self.load_all_evcs()
76 1
        self._topology_updated_at = None
77
78 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
79
        """Get circuits sorted by desc service level and asc creation_time.
80
81
        In the future, as more ops are offloaded it should be get from the DB.
82
        """
83 1
        if enable_filter:
84 1
            return sorted(
85
                          [circuit for circuit in self.circuits.values()
86
                           if circuit.is_enabled()],
87
                          key=lambda x: (-x.service_level, x.creation_time),
88
            )
89 1
        return sorted(self.circuits.values(),
90
                      key=lambda x: (-x.service_level, x.creation_time))
91
92 1
    @staticmethod
93 1
    def get_eline_controller():
94
        """Return the ELineController instance."""
95
        return controllers.ELineController()
96
97 1
    def execute(self):
98
        """Execute once when the napp is running."""
99 1
        if self._lock.locked():
100 1
            return
101 1
        log.debug("Starting consistency routine")
102 1
        with self._lock:
103 1
            self.execute_consistency()
104 1
        log.debug("Finished consistency routine")
105
106 1
    def should_be_checked(self, circuit):
107
        "Verify if the circuit meets the necessary conditions to be checked"
108
        # pylint: disable=too-many-boolean-expressions
109 1
        if (
110
                circuit.is_enabled()
111
                and not circuit.is_active()
112
                and not circuit.lock.locked()
113
                and not circuit.has_recent_removed_flow()
114
                and not circuit.is_recent_updated()
115
                and circuit.are_unis_active(self.controller.switches)
116
                # if a inter-switch EVC does not have current_path, it does not
117
                # make sense to run sdntrace on it
118
                and (circuit.is_intra_switch() or circuit.current_path)
119
                ):
120 1
            return True
121
        return False
122
123 1
    def execute_consistency(self):
124
        """Execute consistency routine."""
125 1
        circuits_to_check = []
126 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
127 1
            if self.should_be_checked(circuit):
128 1
                circuits_to_check.append(circuit)
129 1
            circuit.try_setup_failover_path()
130 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
131 1
        for circuit in circuits_to_check:
132 1
            is_checked = circuits_checked.get(circuit.id)
133 1
            if is_checked:
134 1
                circuit.execution_rounds = 0
135 1
                log.info(f"{circuit} enabled but inactive - activating")
136 1
                with circuit.lock:
137 1
                    circuit.activate()
138 1
                    circuit.sync()
139
            else:
140 1
                circuit.execution_rounds += 1
141 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
142 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
143 1
                    with circuit.lock:
144 1
                        circuit.deploy()
145
146 1
    def shutdown(self):
147
        """Execute when your napp is unloaded.
148
149
        If you have some cleanup procedure, insert it here.
150
        """
151
152 1
    @rest("/v2/evc/", methods=["GET"])
153 1
    def list_circuits(self, request: Request) -> JSONResponse:
154
        """Endpoint to return circuits stored.
155
156
        archive query arg if defined (not null) will be filtered
157
        accordingly, by default only non archived evcs will be listed
158
        """
159 1
        log.debug("list_circuits /v2/evc")
160 1
        args = request.query_params
161 1
        archived = args.get("archived", "false").lower()
162 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
163 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
164
                                                      metadata=args)
165 1
        circuits = circuits['circuits']
166 1
        return JSONResponse(circuits)
167
168 1
    @rest("/v2/evc/schedule", methods=["GET"])
169 1
    def list_schedules(self, _request: Request) -> JSONResponse:
170
        """Endpoint to return all schedules stored for all circuits.
171
172
        Return a JSON with the following template:
173
        [{"schedule_id": <schedule_id>,
174
         "circuit_id": <circuit_id>,
175
         "schedule": <schedule object>}]
176
        """
177 1
        log.debug("list_schedules /v2/evc/schedule")
178 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
179 1
        if not circuits:
180 1
            result = {}
181 1
            status = 200
182 1
            return JSONResponse(result, status_code=status)
183
184 1
        result = []
185 1
        status = 200
186 1
        for circuit in circuits:
187 1
            circuit_scheduler = circuit.get("circuit_scheduler")
188 1
            if circuit_scheduler:
189 1
                for scheduler in circuit_scheduler:
190 1
                    value = {
191
                        "schedule_id": scheduler.get("id"),
192
                        "circuit_id": circuit.get("id"),
193
                        "schedule": scheduler,
194
                    }
195 1
                    result.append(value)
196
197 1
        log.debug("list_schedules result %s %s", result, status)
198 1
        return JSONResponse(result, status_code=status)
199
200 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
201 1
    def get_circuit(self, request: Request) -> JSONResponse:
202
        """Endpoint to return a circuit based on id."""
203 1
        circuit_id = request.path_params["circuit_id"]
204 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
205 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
206 1
        if not circuit:
207 1
            result = f"circuit_id {circuit_id} not found"
208 1
            log.debug("get_circuit result %s %s", result, 404)
209 1
            raise HTTPException(404, detail=result)
210 1
        status = 200
211 1
        log.debug("get_circuit result %s %s", circuit, status)
212 1
        return JSONResponse(circuit, status_code=status)
213
214
    # pylint: disable=too-many-branches, too-many-statements
215 1
    @rest("/v2/evc/", methods=["POST"])
216 1
    @validate_openapi(spec)
217 1
    def create_circuit(self, request: Request) -> JSONResponse:
218
        """Try to create a new circuit.
219
220
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
221
        UNI_Z's requested C-VID are available from the interfaces' pools. This
222
        is checked when creating the UNI object.
223
224
        Then, E-Line NApp requests a primary and a backup path to the
225
        Pathfinder NApp using the attributes primary_links and backup_links
226
        submitted via REST
227
228
        # For each link composing paths in #3:
229
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
230
        #  - Using the S-VID obtained, generate abstract flow entries to be
231
        #    sent to FlowManager
232
233
        Push abstract flow entries to FlowManager and FlowManager pushes
234
        OpenFlow entries to datapaths
235
236
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
237
        creation
238
239
        Finnaly, notify user of the status of its request.
240
        """
241
        # Try to create the circuit object
242 1
        log.debug("create_circuit /v2/evc/")
243 1
        data = get_json_or_400(request, self.controller.loop)
244
245 1
        try:
246 1
            evc = self._evc_from_dict(data)
247 1
        except (ValueError, KytosTagError) as exception:
248 1
            log.debug("create_circuit result %s %s", exception, 400)
249 1
            raise HTTPException(400, detail=str(exception)) from exception
250 1
        try:
251 1
            check_disabled_component(evc.uni_a, evc.uni_z)
252 1
        except DisabledSwitch as exception:
253 1
            log.debug("create_circuit result %s %s", exception, 409)
254 1
            raise HTTPException(
255
                    409,
256
                    detail=f"Path is not valid: {exception}"
257
                ) from exception
258
259 1
        if evc.primary_path:
260 1
            try:
261 1
                evc.primary_path.is_valid(
262
                    evc.uni_a.interface.switch,
263
                    evc.uni_z.interface.switch,
264
                    bool(evc.circuit_scheduler),
265
                )
266 1
            except InvalidPath as exception:
267 1
                raise HTTPException(
268
                    400,
269
                    detail=f"primary_path is not valid: {exception}"
270
                ) from exception
271 1
        if evc.backup_path:
272 1
            try:
273 1
                evc.backup_path.is_valid(
274
                    evc.uni_a.interface.switch,
275
                    evc.uni_z.interface.switch,
276
                    bool(evc.circuit_scheduler),
277
                )
278 1
            except InvalidPath as exception:
279 1
                raise HTTPException(
280
                    400,
281
                    detail=f"backup_path is not valid: {exception}"
282
                ) from exception
283
284 1
        if not evc._tag_lists_equal():
285 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
286 1
            raise HTTPException(400, detail=detail)
287
288 1
        try:
289 1
            evc._validate_has_primary_or_dynamic()
290 1
        except ValueError as exception:
291 1
            raise HTTPException(400, detail=str(exception)) from exception
292
293 1
        try:
294 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
295
        except DuplicatedNoTagUNI as exception:
296
            log.debug("create_circuit result %s %s", exception, 409)
297
            raise HTTPException(409, detail=str(exception)) from exception
298
299 1
        try:
300 1
            self._use_uni_tags(evc)
301 1
        except KytosTagError as exception:
302 1
            raise HTTPException(400, detail=str(exception)) from exception
303
304
        # save circuit
305 1
        try:
306 1
            evc.sync()
307
        except ValidationError as exception:
308
            raise HTTPException(400, detail=str(exception)) from exception
309
310
        # store circuit in dictionary
311 1
        self.circuits[evc.id] = evc
312
313
        # Schedule the circuit deploy
314 1
        self.sched.add(evc)
315
316
        # Circuit has no schedule, deploy now
317 1
        deployed = False
318 1
        if not evc.circuit_scheduler:
319 1
            with evc.lock:
320 1
                deployed = evc.deploy()
321
322
        # Notify users
323 1
        result = {"circuit_id": evc.id, "deployed": deployed}
324 1
        status = 201
325 1
        log.debug("create_circuit result %s %s", result, status)
326 1
        emit_event(self.controller, name="created",
327
                   content=map_evc_event_content(evc))
328 1
        return JSONResponse(result, status_code=status)
329
330 1
    @staticmethod
331 1
    def _use_uni_tags(evc):
332 1
        uni_a = evc.uni_a
333 1
        evc._use_uni_vlan(uni_a)
334 1
        try:
335 1
            uni_z = evc.uni_z
336 1
            evc._use_uni_vlan(uni_z)
337 1
        except KytosTagError as err:
338 1
            evc.make_uni_vlan_available(uni_a)
339 1
            raise err
340
341 1
    @listen_to('kytos/flow_manager.flow.removed')
342 1
    def on_flow_delete(self, event):
343
        """Capture delete messages to keep track when flows got removed."""
344
        self.handle_flow_delete(event)
345
346 1
    def handle_flow_delete(self, event):
347
        """Keep track when the EVC got flows removed by deriving its cookie."""
348 1
        flow = event.content["flow"]
349 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
350 1
        if evc:
351 1
            log.debug("Flow removed in EVC %s", evc.id)
352 1
            evc.set_flow_removed_at()
353
354 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
355 1
    @validate_openapi(spec)
356 1
    def update(self, request: Request) -> JSONResponse:
357
        """Update a circuit based on payload.
358
359
        The EVC attributes (creation_time, active, current_path,
360
        failover_path, _id, archived) can't be updated.
361
        """
362 1
        data = get_json_or_400(request, self.controller.loop)
363 1
        circuit_id = request.path_params["circuit_id"]
364 1
        log.debug("update /v2/evc/%s", circuit_id)
365 1
        try:
366 1
            evc = self.circuits[circuit_id]
367 1
        except KeyError:
368 1
            result = f"circuit_id {circuit_id} not found"
369 1
            log.debug("update result %s %s", result, 404)
370 1
            raise HTTPException(404, detail=result) from KeyError
371
372 1
        try:
373 1
            updated_data = self._evc_dict_with_instances(data)
374 1
            self._check_no_tag_duplication(
375
                circuit_id, updated_data.get("uni_a"),
376
                updated_data.get("uni_z")
377
            )
378 1
            enable, redeploy = evc.update(**updated_data)
379 1
        except (ValueError, KytosTagError, ValidationError) as exception:
380 1
            log.debug("update result %s %s", exception, 400)
381 1
            raise HTTPException(400, detail=str(exception)) from exception
382 1
        except DuplicatedNoTagUNI as exception:
383
            log.debug("update result %s %s", exception, 409)
384
            raise HTTPException(409, detail=str(exception)) from exception
385 1
        except DisabledSwitch as exception:
386 1
            log.debug("update result %s %s", exception, 409)
387 1
            raise HTTPException(
388
                    409,
389
                    detail=f"Path is not valid: {exception}"
390
                ) from exception
391 1
        redeployed = False
392 1
        if evc.is_active():
393
            if enable is False:  # disable if active
394
                with evc.lock:
395
                    evc.remove()
396
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398
                    evc.remove()
399
                    redeployed = evc.deploy()
400
        else:
401 1
            if enable is True:  # enable if inactive
402 1
                with evc.lock:
403 1
                    redeployed = evc.deploy()
404 1
            elif evc.is_enabled() and redeploy:
405 1
                with evc.lock:
406 1
                    evc.remove()
407 1
                    redeployed = evc.deploy()
408 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
409 1
        status = 200
410
411 1
        log.debug("update result %s %s", result, status)
412 1
        emit_event(self.controller, "updated",
413
                   content=map_evc_event_content(evc, **data))
414 1
        return JSONResponse(result, status_code=status)
415
416 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
417 1
    def delete_circuit(self, request: Request) -> JSONResponse:
418
        """Remove a circuit.
419
420
        First, the flows are removed from the switches, and then the EVC is
421
        disabled.
422
        """
423 1
        circuit_id = request.path_params["circuit_id"]
424 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
425 1
        try:
426 1
            evc = self.circuits.pop(circuit_id)
427 1
        except KeyError:
428 1
            result = f"circuit_id {circuit_id} not found"
429 1
            log.debug("delete_circuit result %s %s", result, 404)
430 1
            raise HTTPException(404, detail=result) from KeyError
431 1
        log.info("Removing %s", evc)
432
433 1
        with evc.lock:
434 1
            if not evc.archived:
435 1
                evc.deactivate()
436 1
                evc.disable()
437 1
                self.sched.remove(evc)
438 1
                evc.remove_current_flows(sync=False)
439 1
                evc.remove_failover_flows(sync=False)
440 1
                evc.archive()
441 1
                evc.remove_uni_tags()
442 1
                evc.sync()
443 1
                emit_event(
444
                    self.controller, "deleted",
445
                    content=map_evc_event_content(evc)
446
                )
447
448 1
        log.info("EVC removed. %s", evc)
449 1
        result = {"response": f"Circuit {circuit_id} removed"}
450 1
        status = 200
451 1
        log.debug("delete_circuit result %s %s", result, status)
452
453 1
        return JSONResponse(result, status_code=status)
454
455 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
456 1
    def get_metadata(self, request: Request) -> JSONResponse:
457
        """Get metadata from an EVC."""
458 1
        circuit_id = request.path_params["circuit_id"]
459 1
        try:
460 1
            return (
461
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
462
            )
463
        except KeyError as error:
464
            raise HTTPException(
465
                404,
466
                detail=f"circuit_id {circuit_id} not found."
467
            ) from error
468
469 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
470 1
    @validate_openapi(spec)
471 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
472
        """Add metadata to a bulk of EVCs."""
473 1
        data = get_json_or_400(request, self.controller.loop)
474 1
        circuit_ids = data.pop("circuit_ids")
475
476 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
477
478 1
        fail_evcs = []
479 1
        for _id in circuit_ids:
480 1
            try:
481 1
                evc = self.circuits[_id]
482 1
                evc.extend_metadata(data)
483 1
            except KeyError:
484 1
                fail_evcs.append(_id)
485
486 1
        if fail_evcs:
487 1
            raise HTTPException(404, detail=fail_evcs)
488 1
        return JSONResponse("Operation successful", status_code=201)
489
490 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
491 1
    @validate_openapi(spec)
492 1
    def add_metadata(self, request: Request) -> JSONResponse:
493
        """Add metadata to an EVC."""
494 1
        circuit_id = request.path_params["circuit_id"]
495 1
        metadata = get_json_or_400(request, self.controller.loop)
496 1
        if not isinstance(metadata, dict):
497
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
498 1
        try:
499 1
            evc = self.circuits[circuit_id]
500 1
        except KeyError as error:
501 1
            raise HTTPException(
502
                404,
503
                detail=f"circuit_id {circuit_id} not found."
504
            ) from error
505
506 1
        evc.extend_metadata(metadata)
507 1
        evc.sync()
508 1
        return JSONResponse("Operation successful", status_code=201)
509
510 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
511 1
    @validate_openapi(spec)
512 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
513
        """Delete metada from a bulk of EVCs"""
514 1
        data = get_json_or_400(request, self.controller.loop)
515 1
        key = request.path_params["key"]
516 1
        circuit_ids = data.pop("circuit_ids")
517 1
        self.mongo_controller.update_evcs_metadata(
518
            circuit_ids, {key: ""}, "del"
519
        )
520
521 1
        fail_evcs = []
522 1
        for _id in circuit_ids:
523 1
            try:
524 1
                evc = self.circuits[_id]
525 1
                evc.remove_metadata(key)
526 1
            except KeyError:
527 1
                fail_evcs.append(_id)
528
529 1
        if fail_evcs:
530 1
            raise HTTPException(404, detail=fail_evcs)
531 1
        return JSONResponse("Operation successful")
532
533 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
534 1
    def delete_metadata(self, request: Request) -> JSONResponse:
535
        """Delete metadata from an EVC."""
536 1
        circuit_id = request.path_params["circuit_id"]
537 1
        key = request.path_params["key"]
538 1
        try:
539 1
            evc = self.circuits[circuit_id]
540 1
        except KeyError as error:
541 1
            raise HTTPException(
542
                404,
543
                detail=f"circuit_id {circuit_id} not found."
544
            ) from error
545
546 1
        evc.remove_metadata(key)
547 1
        evc.sync()
548 1
        return JSONResponse("Operation successful")
549
550 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
551 1
    def redeploy(self, request: Request) -> JSONResponse:
552
        """Endpoint to force the redeployment of an EVC."""
553 1
        circuit_id = request.path_params["circuit_id"]
554 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
555 1
        try:
556 1
            evc = self.circuits[circuit_id]
557 1
        except KeyError:
558 1
            raise HTTPException(
559
                404,
560
                detail=f"circuit_id {circuit_id} not found"
561
            ) from KeyError
562 1
        deployed = False
563 1
        if evc.is_enabled():
564 1
            with evc.lock:
565 1
                evc.remove_current_flows(sync=False)
566 1
                evc.remove_failover_flows(sync=True)
567 1
                deployed = evc.deploy()
568 1
        if deployed:
569 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
570 1
            status = 202
571
        else:
572 1
            result = {
573
                "response": f"Circuit {circuit_id} is disabled."
574
            }
575 1
            status = 409
576
577 1
        return JSONResponse(result, status_code=status)
578
579 1
    @rest("/v2/evc/schedule/", methods=["POST"])
580 1
    @validate_openapi(spec)
581 1
    def create_schedule(self, request: Request) -> JSONResponse:
582
        """
583
        Create a new schedule for a given circuit.
584
585
        This service do no check if there are conflicts with another schedule.
586
        Payload example:
587
            {
588
              "circuit_id":"aa:bb:cc",
589
              "schedule": {
590
                "date": "2019-08-07T14:52:10.967Z",
591
                "interval": "string",
592
                "frequency": "1 * * * *",
593
                "action": "create"
594
              }
595
            }
596
        """
597 1
        log.debug("create_schedule /v2/evc/schedule/")
598 1
        data = get_json_or_400(request, self.controller.loop)
599 1
        circuit_id = data["circuit_id"]
600 1
        schedule_data = data["schedule"]
601
602
        # Get EVC from circuits buffer
603 1
        circuits = self._get_circuits_buffer()
604
605
        # get the circuit
606 1
        evc = circuits.get(circuit_id)
607
608
        # get the circuit
609 1
        if not evc:
610 1
            result = f"circuit_id {circuit_id} not found"
611 1
            log.debug("create_schedule result %s %s", result, 404)
612 1
            raise HTTPException(404, detail=result)
613
614
        # new schedule from dict
615 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
616
617
        # If there is no schedule, create the list
618 1
        if not evc.circuit_scheduler:
619 1
            evc.circuit_scheduler = []
620
621
        # Add the new schedule
622 1
        evc.circuit_scheduler.append(new_schedule)
623
624
        # Add schedule job
625 1
        self.sched.add_circuit_job(evc, new_schedule)
626
627
        # save circuit to mongodb
628 1
        evc.sync()
629
630 1
        result = new_schedule.as_dict()
631 1
        status = 201
632
633 1
        log.debug("create_schedule result %s %s", result, status)
634 1
        return JSONResponse(result, status_code=status)
635
636 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
637 1
    @validate_openapi(spec)
638 1
    def update_schedule(self, request: Request) -> JSONResponse:
639
        """Update a schedule.
640
641
        Change all attributes from the given schedule from a EVC circuit.
642
        The schedule ID is preserved as default.
643
        Payload example:
644
            {
645
              "date": "2019-08-07T14:52:10.967Z",
646
              "interval": "string",
647
              "frequency": "1 * * *",
648
              "action": "create"
649
            }
650
        """
651 1
        data = get_json_or_400(request, self.controller.loop)
652 1
        schedule_id = request.path_params["schedule_id"]
653 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
654
655
        # Try to find a circuit schedule
656 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
657
658
        # Can not modify circuits deleted and archived
659 1
        if not found_schedule:
660 1
            result = f"schedule_id {schedule_id} not found"
661 1
            log.debug("update_schedule result %s %s", result, 404)
662 1
            raise HTTPException(404, detail=result)
663
664 1
        new_schedule = CircuitSchedule.from_dict(data)
665 1
        new_schedule.id = found_schedule.id
666
        # Remove the old schedule
667 1
        evc.circuit_scheduler.remove(found_schedule)
668
        # Append the modified schedule
669 1
        evc.circuit_scheduler.append(new_schedule)
670
671
        # Cancel all schedule jobs
672 1
        self.sched.cancel_job(found_schedule.id)
673
        # Add the new circuit schedule
674 1
        self.sched.add_circuit_job(evc, new_schedule)
675
        # Save EVC to mongodb
676 1
        evc.sync()
677
678 1
        result = new_schedule.as_dict()
679 1
        status = 200
680
681 1
        log.debug("update_schedule result %s %s", result, status)
682 1
        return JSONResponse(result, status_code=status)
683
684 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
685 1
    def delete_schedule(self, request: Request) -> JSONResponse:
686
        """Remove a circuit schedule.
687
688
        Remove the Schedule from EVC.
689
        Remove the Schedule from cron job.
690
        Save the EVC to the Storehouse.
691
        """
692 1
        schedule_id = request.path_params["schedule_id"]
693 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
694 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
695
696
        # Can not modify circuits deleted and archived
697 1
        if not found_schedule:
698 1
            result = f"schedule_id {schedule_id} not found"
699 1
            log.debug("delete_schedule result %s %s", result, 404)
700 1
            raise HTTPException(404, detail=result)
701
702
        # Remove the old schedule
703 1
        evc.circuit_scheduler.remove(found_schedule)
704
705
        # Cancel all schedule jobs
706 1
        self.sched.cancel_job(found_schedule.id)
707
        # Save EVC to mongodb
708 1
        evc.sync()
709
710 1
        result = "Schedule removed"
711 1
        status = 200
712
713 1
        log.debug("delete_schedule result %s %s", result, status)
714 1
        return JSONResponse(result, status_code=status)
715
716 1
    def _check_no_tag_duplication(
717
        self,
718
        evc_id: str,
719
        uni_a: Optional[UNI] = None,
720
        uni_z: Optional[UNI] = None
721
    ):
722
        """Check if the given EVC has UNIs with no tag and if these are
723
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
724
        Args:
725
            evc (dict): EVC to be analyzed.
726
        """
727
728
        # No UNIs
729 1
        if not (uni_a or uni_z):
730 1
            return
731
732 1
        if (not (uni_a and not uni_a.user_tag) and
733
                not (uni_z and not uni_z.user_tag)):
734 1
            return
735 1
        for circuit in self.circuits.copy().values():
736 1
            if (not circuit.archived and circuit._id != evc_id):
737 1
                if uni_a and uni_a.user_tag is None:
738 1
                    circuit.check_no_tag_duplicate(uni_a)
739 1
                if uni_z and uni_z.user_tag is None:
740 1
                    circuit.check_no_tag_duplicate(uni_z)
741
742 1
    @listen_to("kytos/topology.link_up")
743 1
    def on_link_up(self, event):
744
        """Change circuit when link is up or end_maintenance."""
745
        self.handle_link_up(event)
746
747 1
    def handle_link_up(self, event):
748
        """Change circuit when link is up or end_maintenance."""
749 1
        log.info("Event handle_link_up %s", event.content["link"])
750 1
        for evc in self.get_evcs_by_svc_level():
751 1
            if evc.is_enabled() and not evc.archived:
752 1
                with evc.lock:
753 1
                    evc.handle_link_up(event.content["link"])
754
755
    # Possibly replace this with interruptions?
756 1
    @listen_to(
757
        '.*.switch.interface.(link_up|link_down|created|deleted)'
758
    )
759 1
    def on_interface_link_change(self, event: KytosEvent):
760
        """
761
        Handler for interface link_up and link_down events.
762
        """
763
        self.handle_on_interface_link_change(event)
764
765 1
    def handle_on_interface_link_change(self, event: KytosEvent):
766
        """
767
        Handler to sort interface events {link_(up, down), create, deleted}
768
769
        To avoid multiple database updated (link flap):
770
        Every interface is identfied and processed in parallel.
771
        Once an interface event is received a time is started.
772
        While time is running self._intf_events will be updated.
773
        After time has passed last received event will be processed.
774
        """
775 1
        iface = event.content.get("interface")
776 1
        with self._lock_interfaces[iface.id]:
777 1
            _now = event.timestamp
778
            # Return out of order events
779 1
            if (
780
                iface.id in self._intf_events
781
                and self._intf_events[iface.id]["event"].timestamp > _now
782
            ):
783 1
                return
784 1
            self._intf_events[iface.id].update({"event": event})
785 1
            if "last_acquired" in self._intf_events[iface.id]:
786 1
                return
787 1
            self._intf_events[iface.id].update({"last_acquired": now()})
788 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
789 1
        with self._lock_interfaces[iface.id]:
790 1
            event = self._intf_events[iface.id]["event"]
791 1
            self._intf_events[iface.id].pop('last_acquired', None)
792 1
            _, _, event_type = event.name.rpartition('.')
793 1
            if event_type in ('link_up', 'created'):
794 1
                self.handle_interface_link_up(iface)
795 1
            elif event_type in ('link_down', 'deleted'):
796 1
                self.handle_interface_link_down(iface)
797
798 1
    def handle_interface_link_up(self, interface):
799
        """
800
        Handler for interface link_up events
801
        """
802
        log.info("Event handle_interface_link_up %s", interface)
803
        for evc in self.get_evcs_by_svc_level():
804
            with evc.lock:
805
                evc.handle_interface_link_up(
806
                    interface
807
                )
808
809 1
    def handle_interface_link_down(self, interface):
810
        """
811
        Handler for interface link_down events
812
        """
813
        log.info("Event handle_interface_link_down %s", interface)
814
        for evc in self.get_evcs_by_svc_level():
815
            with evc.lock:
816
                evc.handle_interface_link_down(
817
                    interface
818
                )
819
820 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
821 1
    def on_link_down(self, event):
822
        """Change circuit when link is down or under_mantenance."""
823
        self.handle_link_down(event)
824
825
    # pylint: disable=too-many-branches
826
    # pylint: disable=too-many-locals
827 1
    def handle_link_down(self, event):
828
        """Change circuit when link is down or under_mantenance."""
829 1
        link = event.content["link"]
830 1
        log.info("Event handle_link_down %s", link)
831 1
        switch_flows = {}
832 1
        evcs_with_failover = []
833 1
        evcs_normal = []
834 1
        check_failover = []
835 1
        failover_event_contents = {}
836
837 1
        for evc in self.get_evcs_by_svc_level():
838 1
            with evc.lock:
839 1
                if evc.is_affected_by_link(link):
840 1
                    evc.affected_by_link_at = event.timestamp
841
                    # if there is no failover path, handles link down the
842
                    # tradditional way
843 1
                    if (
844
                        not evc.failover_path or
845
                        evc.is_failover_path_affected_by_link(link)
846
                    ):
847 1
                        evcs_normal.append(evc)
848 1
                        continue
849 1
                    try:
850 1
                        dpid_flows = evc.get_failover_flows()
851 1
                        evc.old_path = evc.current_path
852 1
                        evc.current_path = evc.failover_path
853 1
                        evc.failover_path = Path([])
854
                    # pylint: disable=broad-except
855 1
                    except Exception:
856 1
                        err = traceback.format_exc().replace("\n", ", ")
857 1
                        log.error(
858
                            "Ignore Failover path for "
859
                            f"{evc} due to error: {err}"
860
                        )
861 1
                        evcs_normal.append(evc)
862 1
                        continue
863 1
                    for dpid, flows in dpid_flows.items():
864 1
                        switch_flows.setdefault(dpid, [])
865 1
                        switch_flows[dpid].extend(flows)
866 1
                    evcs_with_failover.append(evc)
867 1
                    failover_event_contents[evc.id] = map_evc_event_content(
868
                        evc,
869
                        flows={k: v.copy() for k, v in switch_flows.items()}
870
                    )
871 1
                elif evc.is_failover_path_affected_by_link(link):
872 1
                    evc.old_path = evc.failover_path
873 1
                    evc.failover_path = Path([])
874 1
                    check_failover.append(evc)
875
876 1
        if failover_event_contents:
877 1
            emit_event(self.controller, "failover_link_down",
878
                       content=failover_event_contents)
879 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
880
881 1
        for evc in evcs_normal:
882 1
            emit_event(
883
                self.controller,
884
                "evc_affected_by_link_down",
885
                content={"link": link} | map_evc_event_content(evc)
886
            )
887
888 1
        evcs_to_update = []
889 1
        for evc in evcs_with_failover:
890 1
            evcs_to_update.append(evc.as_dict())
891 1
            log.info(
892
                f"{evc} redeployed with failover due to link down {link.id}"
893
            )
894 1
        for evc in check_failover:
895 1
            evcs_to_update.append(evc.as_dict())
896
897 1
        self.mongo_controller.update_evcs(evcs_to_update)
898
899 1
        emit_event(
900
            self.controller,
901
            "cleanup_evcs_old_path",
902
            content={"evcs": evcs_with_failover + check_failover}
903
        )
904
905 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
906 1
    def on_evc_affected_by_link_down(self, event):
907
        """Change circuit when link is down or under_mantenance."""
908
        self.handle_evc_affected_by_link_down(event)
909
910 1
    def handle_evc_affected_by_link_down(self, event):
911
        """Change circuit when link is down or under_mantenance."""
912 1
        evc = self.circuits.get(event.content["evc_id"])
913 1
        link = event.content['link']
914 1
        if not evc:
915 1
            return
916 1
        with evc.lock:
917 1
            if not evc.is_affected_by_link(link):
918
                return
919 1
            result = evc.handle_link_down()
920 1
        event_name = "error_redeploy_link_down"
921 1
        if result:
922 1
            log.info(f"{evc} redeployed due to link down {link.id}")
923 1
            event_name = "redeployed_link_down"
924 1
        emit_event(self.controller, event_name,
925
                   content=map_evc_event_content(evc))
926
927 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
928 1
    def on_evc_deployed(self, event):
929
        """Handle EVC deployed|redeployed_link_down."""
930
        self.handle_evc_deployed(event)
931
932 1
    def handle_evc_deployed(self, event):
933
        """Setup failover path on evc deployed."""
934
        evc = self.circuits.get(event.content["evc_id"])
935
        if not evc:
936
            return
937
        evc.try_setup_failover_path()
938
939 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
940 1
    def on_cleanup_evcs_old_path(self, event):
941
        """Handle cleanup evcs old path."""
942
        self.handle_cleanup_evcs_old_path(event)
943
944 1
    def handle_cleanup_evcs_old_path(self, event):
945
        """Handle cleanup evcs old path."""
946 1
        evcs = event.content.get("evcs", [])
947 1
        event_contents: dict[str, dict] = defaultdict(list)
948 1
        total_flows = {}
949 1
        for evc in evcs:
950 1
            if not evc.old_path:
951 1
                continue
952 1
            with evc.lock:
953 1
                removed_flows = {}
954 1
                try:
955 1
                    nni_flows = evc._prepare_nni_flows(evc.old_path)
956 1
                    uni_flows = evc._prepare_uni_flows(
957
                        evc.old_path, skip_in=True
958
                    )
959 1
                    removed_flows = merge_flow_dicts(nni_flows, uni_flows)
960
                # pylint: disable=broad-except
961
                except Exception:
962
                    err = traceback.format_exc().replace("\n", ", ")
963
                    log.error(f"Fail to remove {evc} old_path: {err}")
964
                    continue
965 1
            if removed_flows:
966 1
                total_flows = merge_flow_dicts(removed_flows, total_flows)
967 1
                content = map_evc_event_content(
968
                    evc,
969
                    removed_flows=removed_flows,
970
                    current_path=evc.current_path.as_dict(),
971
                )
972 1
                event_contents[evc.id] = content
973 1
        if event_contents:
974 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
975 1
            emit_event(self.controller, "failover_old_path",
976
                       content=event_contents)
977
978 1
    @listen_to("kytos/topology.topology_loaded")
979 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
980
        """Load EVCs once the topology is available."""
981
        self.load_all_evcs()
982
983 1
    def load_all_evcs(self):
984
        """Try to load all EVCs on startup."""
985 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
986 1
        for circuit_id, circuit in circuits:
987 1
            if circuit_id not in self.circuits:
988 1
                self._load_evc(circuit)
989 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
990
                   timeout=1)
991
992 1
    def _load_evc(self, circuit_dict):
993
        """Load one EVC from mongodb to memory."""
994 1
        try:
995 1
            evc = self._evc_from_dict(circuit_dict)
996 1
        except (ValueError, KytosTagError) as exception:
997 1
            log.error(
998
                f"Could not load EVC: dict={circuit_dict} error={exception}"
999
            )
1000 1
            return None
1001 1
        if evc.archived:
1002 1
            return None
1003
1004 1
        self.circuits.setdefault(evc.id, evc)
1005 1
        self.sched.add(evc)
1006 1
        return evc
1007
1008 1
    @listen_to("kytos/flow_manager.flow.error")
1009 1
    def on_flow_mod_error(self, event):
1010
        """Handle flow mod errors related to an EVC."""
1011
        self.handle_flow_mod_error(event)
1012
1013 1
    def handle_flow_mod_error(self, event):
1014
        """Handle flow mod errors related to an EVC."""
1015 1
        flow = event.content["flow"]
1016 1
        command = event.content.get("error_command")
1017 1
        if command != "add":
1018
            return
1019 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1020 1
        if evc:
1021 1
            with evc.lock:
1022 1
                evc.remove_current_flows(sync=False)
1023 1
                evc.remove_failover_flows(sync=True)
1024
1025 1
    def _evc_dict_with_instances(self, evc_dict):
1026
        """Convert some dict values to instance of EVC classes.
1027
1028
        This method will convert: [UNI, Link]
1029
        """
1030 1
        data = evc_dict.copy()  # Do not modify the original dict
1031 1
        for attribute, value in data.items():
1032
            # Get multiple attributes.
1033
            # Ex: uni_a, uni_z
1034 1
            if "uni" in attribute:
1035 1
                try:
1036 1
                    data[attribute] = self._uni_from_dict(value)
1037 1
                except ValueError as exception:
1038 1
                    result = "Error creating UNI: Invalid value"
1039 1
                    raise ValueError(result) from exception
1040
1041 1
            if attribute == "circuit_scheduler":
1042 1
                data[attribute] = []
1043 1
                for schedule in value:
1044 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1045
1046
            # Get multiple attributes.
1047
            # Ex: primary_links,
1048
            #     backup_links,
1049
            #     current_links_cache,
1050
            #     primary_links_cache,
1051
            #     backup_links_cache
1052 1
            if "links" in attribute:
1053 1
                data[attribute] = [
1054
                    self._link_from_dict(link) for link in value
1055
                ]
1056
1057
            # Ex: current_path,
1058
            #     primary_path,
1059
            #     backup_path
1060 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1061 1
                data[attribute] = Path(
1062
                    [self._link_from_dict(link) for link in value]
1063
                )
1064
1065 1
        return data
1066
1067 1
    def _evc_from_dict(self, evc_dict):
1068 1
        data = self._evc_dict_with_instances(evc_dict)
1069 1
        data["table_group"] = self.table_group
1070 1
        return EVC(self.controller, **data)
1071
1072 1
    def _uni_from_dict(self, uni_dict):
1073
        """Return a UNI object from python dict."""
1074 1
        if uni_dict is None:
1075 1
            return False
1076
1077 1
        interface_id = uni_dict.get("interface_id")
1078 1
        interface = self.controller.get_interface_by_id(interface_id)
1079 1
        if interface is None:
1080 1
            result = (
1081
                "Error creating UNI:"
1082
                + f"Could not instantiate interface {interface_id}"
1083
            )
1084 1
            raise ValueError(result) from ValueError
1085 1
        tag_convert = {1: "vlan"}
1086 1
        tag_dict = uni_dict.get("tag", None)
1087 1
        if tag_dict:
1088 1
            tag_type = tag_dict.get("tag_type")
1089 1
            tag_type = tag_convert.get(tag_type, tag_type)
1090 1
            tag_value = tag_dict.get("value")
1091 1
            if isinstance(tag_value, list):
1092 1
                tag_value = get_tag_ranges(tag_value)
1093 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1094 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1095
            else:
1096 1
                tag = TAG(tag_type, tag_value)
1097
        else:
1098 1
            tag = None
1099 1
        uni = UNI(interface, tag)
1100 1
        return uni
1101
1102 1
    def _link_from_dict(self, link_dict):
1103
        """Return a Link object from python dict."""
1104 1
        id_a = link_dict.get("endpoint_a").get("id")
1105 1
        id_b = link_dict.get("endpoint_b").get("id")
1106
1107 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1108 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1109 1
        if not endpoint_a:
1110 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1111 1
            raise ValueError(error_msg)
1112 1
        if not endpoint_b:
1113
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1114
            raise ValueError(error_msg)
1115
1116 1
        link = Link(endpoint_a, endpoint_b)
1117 1
        if "metadata" in link_dict:
1118 1
            link.extend_metadata(link_dict.get("metadata"))
1119
1120 1
        s_vlan = link.get_metadata("s_vlan")
1121 1
        if s_vlan:
1122 1
            tag = TAG.from_dict(s_vlan)
1123 1
            if tag is False:
1124
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1125
                raise ValueError(error_msg)
1126 1
            link.update_metadata("s_vlan", tag)
1127 1
        return link
1128
1129 1
    def _find_evc_by_schedule_id(self, schedule_id):
1130
        """
1131
        Find an EVC and CircuitSchedule based on schedule_id.
1132
1133
        :param schedule_id: Schedule ID
1134
        :return: EVC and Schedule
1135
        """
1136 1
        circuits = self._get_circuits_buffer()
1137 1
        found_schedule = None
1138 1
        evc = None
1139
1140
        # pylint: disable=unused-variable
1141 1
        for c_id, circuit in circuits.items():
1142 1
            for schedule in circuit.circuit_scheduler:
1143 1
                if schedule.id == schedule_id:
1144 1
                    found_schedule = schedule
1145 1
                    evc = circuit
1146 1
                    break
1147 1
            if found_schedule:
1148 1
                break
1149 1
        return evc, found_schedule
1150
1151 1
    def _get_circuits_buffer(self):
1152
        """
1153
        Return the circuit buffer.
1154
1155
        If the buffer is empty, try to load data from mongodb.
1156
        """
1157 1
        if not self.circuits:
1158
            # Load circuits from mongodb to buffer
1159 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1160 1
            for c_id, circuit in circuits.items():
1161 1
                evc = self._evc_from_dict(circuit)
1162 1
                self.circuits[c_id] = evc
1163 1
        return self.circuits
1164
1165
    # pylint: disable=attribute-defined-outside-init
1166 1
    @alisten_to("kytos/of_multi_table.enable_table")
1167 1
    async def on_table_enabled(self, event):
1168
        """Handle a recently table enabled."""
1169 1
        table_group = event.content.get("mef_eline", None)
1170 1
        if not table_group:
1171 1
            return
1172 1
        for group in table_group:
1173 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1174 1
                log.error(f'The table group "{group}" is not allowed for '
1175
                          f'mef_eline. Allowed table groups are '
1176
                          f'{settings.TABLE_GROUP_ALLOWED}')
1177 1
                return
1178 1
        self.table_group.update(table_group)
1179 1
        content = {"group_table": self.table_group}
1180 1
        name = "kytos/mef_eline.enable_table"
1181
        await aemit_event(self.controller, name, content)
1182