Passed
Pull Request — master (#571)
by Vinicius
04:29
created

build.main.Main.on_link_down()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
crap 1.037
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from copy import deepcopy
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts, prepare_delete_flow,
36
                                         send_flow_mods_event)
37
38
39
# pylint: disable=too-many-public-methods
40 1
class Main(KytosNApp):
41
    """Main class of amlight/mef_eline NApp.
42
43
    This class is the entry point for this napp.
44
    """
45
46 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
47
48 1
    def setup(self):
49
        """Replace the '__init__' method for the KytosNApp subclass.
50
51
        The setup method is automatically called by the controller when your
52
        application is loaded.
53
54
        So, if you have any setup routine, insert it here.
55
        """
56
        # object used to scheduler circuit events
57 1
        self.sched = Scheduler()
58
59
        # object to save and load circuits
60 1
        self.mongo_controller = self.get_eline_controller()
61 1
        self.mongo_controller.bootstrap_indexes()
62
63
        # set the controller that will manager the dynamic paths
64 1
        DynamicPathManager.set_controller(self.controller)
65
66
        # dictionary of EVCs created. It acts as a circuit buffer.
67
        # Every create/update/delete must be synced to mongodb.
68 1
        self.circuits = {}
69
70 1
        self._intf_events = defaultdict(dict)
71 1
        self._lock_interfaces = defaultdict(Lock)
72 1
        self.table_group = {"epl": 0, "evpl": 0}
73 1
        self._lock = Lock()
74 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
75
76 1
        self.load_all_evcs()
77 1
        self._topology_updated_at = None
78
79 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
80
        """Get circuits sorted by desc service level and asc creation_time.
81
82
        In the future, as more ops are offloaded it should be get from the DB.
83
        """
84 1
        if enable_filter:
85 1
            return sorted(
86
                          [circuit for circuit in self.circuits.values()
87
                           if circuit.is_enabled()],
88
                          key=lambda x: (-x.service_level, x.creation_time),
89
            )
90 1
        return sorted(self.circuits.values(),
91
                      key=lambda x: (-x.service_level, x.creation_time))
92
93 1
    @staticmethod
94 1
    def get_eline_controller():
95
        """Return the ELineController instance."""
96
        return controllers.ELineController()
97
98 1
    def execute(self):
99
        """Execute once when the napp is running."""
100 1
        if self._lock.locked():
101 1
            return
102 1
        log.debug("Starting consistency routine")
103 1
        with self._lock:
104 1
            self.execute_consistency()
105 1
        log.debug("Finished consistency routine")
106
107 1
    def should_be_checked(self, circuit):
108
        "Verify if the circuit meets the necessary conditions to be checked"
109
        # pylint: disable=too-many-boolean-expressions
110 1
        if (
111
                circuit.is_enabled()
112
                and not circuit.is_active()
113
                and not circuit.lock.locked()
114
                and not circuit.has_recent_removed_flow()
115
                and not circuit.is_recent_updated()
116
                and circuit.are_unis_active(self.controller.switches)
117
                # if a inter-switch EVC does not have current_path, it does not
118
                # make sense to run sdntrace on it
119
                and (circuit.is_intra_switch() or circuit.current_path)
120
                ):
121 1
            return True
122
        return False
123
124 1
    def execute_consistency(self):
125
        """Execute consistency routine."""
126 1
        circuits_to_check = []
127 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
            circuit.try_setup_failover_path()
131 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
132 1
        for circuit in circuits_to_check:
133 1
            is_checked = circuits_checked.get(circuit.id)
134 1
            if is_checked:
135 1
                circuit.execution_rounds = 0
136 1
                log.info(f"{circuit} enabled but inactive - activating")
137 1
                with circuit.lock:
138 1
                    circuit.activate()
139 1
                    circuit.sync()
140
            else:
141 1
                circuit.execution_rounds += 1
142 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
143 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
144 1
                    with circuit.lock:
145 1
                        circuit.deploy()
146
147 1
    def shutdown(self):
148
        """Execute when your napp is unloaded.
149
150
        If you have some cleanup procedure, insert it here.
151
        """
152
153 1
    @rest("/v2/evc/", methods=["GET"])
154 1
    def list_circuits(self, request: Request) -> JSONResponse:
155
        """Endpoint to return circuits stored.
156
157
        archive query arg if defined (not null) will be filtered
158
        accordingly, by default only non archived evcs will be listed
159
        """
160 1
        log.debug("list_circuits /v2/evc")
161 1
        args = request.query_params
162 1
        archived = args.get("archived", "false").lower()
163 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
164 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
165
                                                      metadata=args)
166 1
        circuits = circuits['circuits']
167 1
        return JSONResponse(circuits)
168
169 1
    @rest("/v2/evc/schedule", methods=["GET"])
170 1
    def list_schedules(self, _request: Request) -> JSONResponse:
171
        """Endpoint to return all schedules stored for all circuits.
172
173
        Return a JSON with the following template:
174
        [{"schedule_id": <schedule_id>,
175
         "circuit_id": <circuit_id>,
176
         "schedule": <schedule object>}]
177
        """
178 1
        log.debug("list_schedules /v2/evc/schedule")
179 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
180 1
        if not circuits:
181 1
            result = {}
182 1
            status = 200
183 1
            return JSONResponse(result, status_code=status)
184
185 1
        result = []
186 1
        status = 200
187 1
        for circuit in circuits:
188 1
            circuit_scheduler = circuit.get("circuit_scheduler")
189 1
            if circuit_scheduler:
190 1
                for scheduler in circuit_scheduler:
191 1
                    value = {
192
                        "schedule_id": scheduler.get("id"),
193
                        "circuit_id": circuit.get("id"),
194
                        "schedule": scheduler,
195
                    }
196 1
                    result.append(value)
197
198 1
        log.debug("list_schedules result %s %s", result, status)
199 1
        return JSONResponse(result, status_code=status)
200
201 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
202 1
    def get_circuit(self, request: Request) -> JSONResponse:
203
        """Endpoint to return a circuit based on id."""
204 1
        circuit_id = request.path_params["circuit_id"]
205 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
206 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
207 1
        if not circuit:
208 1
            result = f"circuit_id {circuit_id} not found"
209 1
            log.debug("get_circuit result %s %s", result, 404)
210 1
            raise HTTPException(404, detail=result)
211 1
        status = 200
212 1
        log.debug("get_circuit result %s %s", circuit, status)
213 1
        return JSONResponse(circuit, status_code=status)
214
215
    # pylint: disable=too-many-branches, too-many-statements
216 1
    @rest("/v2/evc/", methods=["POST"])
217 1
    @validate_openapi(spec)
218 1
    def create_circuit(self, request: Request) -> JSONResponse:
219
        """Try to create a new circuit.
220
221
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
222
        UNI_Z's requested C-VID are available from the interfaces' pools. This
223
        is checked when creating the UNI object.
224
225
        Then, E-Line NApp requests a primary and a backup path to the
226
        Pathfinder NApp using the attributes primary_links and backup_links
227
        submitted via REST
228
229
        # For each link composing paths in #3:
230
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
231
        #  - Using the S-VID obtained, generate abstract flow entries to be
232
        #    sent to FlowManager
233
234
        Push abstract flow entries to FlowManager and FlowManager pushes
235
        OpenFlow entries to datapaths
236
237
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
238
        creation
239
240
        Finnaly, notify user of the status of its request.
241
        """
242
        # Try to create the circuit object
243 1
        log.debug("create_circuit /v2/evc/")
244 1
        data = get_json_or_400(request, self.controller.loop)
245
246 1
        try:
247 1
            evc = self._evc_from_dict(data)
248 1
        except (ValueError, KytosTagError) as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 400)
250 1
            raise HTTPException(400, detail=str(exception)) from exception
251 1
        try:
252 1
            check_disabled_component(evc.uni_a, evc.uni_z)
253 1
        except DisabledSwitch as exception:
254 1
            log.debug("create_circuit result %s %s", exception, 409)
255 1
            raise HTTPException(
256
                    409,
257
                    detail=f"Path is not valid: {exception}"
258
                ) from exception
259
260 1
        if evc.primary_path:
261 1
            try:
262 1
                evc.primary_path.is_valid(
263
                    evc.uni_a.interface.switch,
264
                    evc.uni_z.interface.switch,
265
                    bool(evc.circuit_scheduler),
266
                )
267 1
            except InvalidPath as exception:
268 1
                raise HTTPException(
269
                    400,
270
                    detail=f"primary_path is not valid: {exception}"
271
                ) from exception
272 1
        if evc.backup_path:
273 1
            try:
274 1
                evc.backup_path.is_valid(
275
                    evc.uni_a.interface.switch,
276
                    evc.uni_z.interface.switch,
277
                    bool(evc.circuit_scheduler),
278
                )
279 1
            except InvalidPath as exception:
280 1
                raise HTTPException(
281
                    400,
282
                    detail=f"backup_path is not valid: {exception}"
283
                ) from exception
284
285 1
        if not evc._tag_lists_equal():
286 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
287 1
            raise HTTPException(400, detail=detail)
288
289 1
        try:
290 1
            evc._validate_has_primary_or_dynamic()
291 1
        except ValueError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
296
        except DuplicatedNoTagUNI as exception:
297
            log.debug("create_circuit result %s %s", exception, 409)
298
            raise HTTPException(409, detail=str(exception)) from exception
299
300 1
        try:
301 1
            self._use_uni_tags(evc)
302 1
        except KytosTagError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # save circuit
306 1
        try:
307 1
            evc.sync()
308
        except ValidationError as exception:
309
            raise HTTPException(400, detail=str(exception)) from exception
310
311
        # store circuit in dictionary
312 1
        self.circuits[evc.id] = evc
313
314
        # Schedule the circuit deploy
315 1
        self.sched.add(evc)
316
317
        # Circuit has no schedule, deploy now
318 1
        deployed = False
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                deployed = evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id, "deployed": deployed}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392 1
        redeployed = False
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    redeployed = evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    redeployed = evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    redeployed = evc.deploy()
409 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432 1
        log.info("Removing %s", evc)
433
434 1
        with evc.lock:
435 1
            if not evc.archived:
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.remove_current_flows(sync=False)
440 1
                evc.remove_failover_flows(sync=False)
441 1
                evc.archive()
442 1
                evc.remove_uni_tags()
443 1
                evc.sync()
444 1
                emit_event(
445
                    self.controller, "deleted",
446
                    content=map_evc_event_content(evc)
447
                )
448
449 1
        log.info("EVC removed. %s", evc)
450 1
        result = {"response": f"Circuit {circuit_id} removed"}
451 1
        status = 200
452 1
        log.debug("delete_circuit result %s %s", result, status)
453
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs_metadata(
519
            circuit_ids, {key: ""}, "del"
520
        )
521
522 1
        fail_evcs = []
523 1
        for _id in circuit_ids:
524 1
            try:
525 1
                evc = self.circuits[_id]
526 1
                evc.remove_metadata(key)
527 1
            except KeyError:
528 1
                fail_evcs.append(_id)
529
530 1
        if fail_evcs:
531 1
            raise HTTPException(404, detail=fail_evcs)
532 1
        return JSONResponse("Operation successful")
533
534 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
535 1
    def delete_metadata(self, request: Request) -> JSONResponse:
536
        """Delete metadata from an EVC."""
537 1
        circuit_id = request.path_params["circuit_id"]
538 1
        key = request.path_params["key"]
539 1
        try:
540 1
            evc = self.circuits[circuit_id]
541 1
        except KeyError as error:
542 1
            raise HTTPException(
543
                404,
544
                detail=f"circuit_id {circuit_id} not found."
545
            ) from error
546
547 1
        evc.remove_metadata(key)
548 1
        evc.sync()
549 1
        return JSONResponse("Operation successful")
550
551 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
552 1
    def redeploy(self, request: Request) -> JSONResponse:
553
        """Endpoint to force the redeployment of an EVC."""
554 1
        circuit_id = request.path_params["circuit_id"]
555 1
        try_avoid_same_s_vlan = request.query_params.get("avoid_vlan", "true")
556 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
557 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
558
            msg = "Parameter avoid_vlan does not have a valid value."
559
            raise HTTPException(400, detail=msg)
560 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
561 1
        try:
562 1
            evc = self.circuits[circuit_id]
563 1
        except KeyError:
564 1
            raise HTTPException(
565
                404,
566
                detail=f"circuit_id {circuit_id} not found"
567
            ) from KeyError
568 1
        deployed = False
569 1
        if evc.is_enabled():
570 1
            with evc.lock:
571 1
                path_dict = evc.remove_current_flows(
572
                    sync=False,
573
                    return_path=try_avoid_same_s_vlan == "true"
574
                )
575 1
                evc.remove_failover_flows(sync=True)
576 1
                deployed = evc.deploy(path_dict)
577 1
        if deployed:
578 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
579 1
            status = 202
580
        else:
581 1
            result = {
582
                "response": f"Circuit {circuit_id} is disabled."
583
            }
584 1
            status = 409
585
586 1
        return JSONResponse(result, status_code=status)
587
588 1
    @rest("/v2/evc/schedule/", methods=["POST"])
589 1
    @validate_openapi(spec)
590 1
    def create_schedule(self, request: Request) -> JSONResponse:
591
        """
592
        Create a new schedule for a given circuit.
593
594
        This service do no check if there are conflicts with another schedule.
595
        Payload example:
596
            {
597
              "circuit_id":"aa:bb:cc",
598
              "schedule": {
599
                "date": "2019-08-07T14:52:10.967Z",
600
                "interval": "string",
601
                "frequency": "1 * * * *",
602
                "action": "create"
603
              }
604
            }
605
        """
606 1
        log.debug("create_schedule /v2/evc/schedule/")
607 1
        data = get_json_or_400(request, self.controller.loop)
608 1
        circuit_id = data["circuit_id"]
609 1
        schedule_data = data["schedule"]
610
611
        # Get EVC from circuits buffer
612 1
        circuits = self._get_circuits_buffer()
613
614
        # get the circuit
615 1
        evc = circuits.get(circuit_id)
616
617
        # get the circuit
618 1
        if not evc:
619 1
            result = f"circuit_id {circuit_id} not found"
620 1
            log.debug("create_schedule result %s %s", result, 404)
621 1
            raise HTTPException(404, detail=result)
622
623
        # new schedule from dict
624 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
625
626
        # If there is no schedule, create the list
627 1
        if not evc.circuit_scheduler:
628 1
            evc.circuit_scheduler = []
629
630
        # Add the new schedule
631 1
        evc.circuit_scheduler.append(new_schedule)
632
633
        # Add schedule job
634 1
        self.sched.add_circuit_job(evc, new_schedule)
635
636
        # save circuit to mongodb
637 1
        evc.sync()
638
639 1
        result = new_schedule.as_dict()
640 1
        status = 201
641
642 1
        log.debug("create_schedule result %s %s", result, status)
643 1
        return JSONResponse(result, status_code=status)
644
645 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
646 1
    @validate_openapi(spec)
647 1
    def update_schedule(self, request: Request) -> JSONResponse:
648
        """Update a schedule.
649
650
        Change all attributes from the given schedule from a EVC circuit.
651
        The schedule ID is preserved as default.
652
        Payload example:
653
            {
654
              "date": "2019-08-07T14:52:10.967Z",
655
              "interval": "string",
656
              "frequency": "1 * * *",
657
              "action": "create"
658
            }
659
        """
660 1
        data = get_json_or_400(request, self.controller.loop)
661 1
        schedule_id = request.path_params["schedule_id"]
662 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
663
664
        # Try to find a circuit schedule
665 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
666
667
        # Can not modify circuits deleted and archived
668 1
        if not found_schedule:
669 1
            result = f"schedule_id {schedule_id} not found"
670 1
            log.debug("update_schedule result %s %s", result, 404)
671 1
            raise HTTPException(404, detail=result)
672
673 1
        new_schedule = CircuitSchedule.from_dict(data)
674 1
        new_schedule.id = found_schedule.id
675
        # Remove the old schedule
676 1
        evc.circuit_scheduler.remove(found_schedule)
677
        # Append the modified schedule
678 1
        evc.circuit_scheduler.append(new_schedule)
679
680
        # Cancel all schedule jobs
681 1
        self.sched.cancel_job(found_schedule.id)
682
        # Add the new circuit schedule
683 1
        self.sched.add_circuit_job(evc, new_schedule)
684
        # Save EVC to mongodb
685 1
        evc.sync()
686
687 1
        result = new_schedule.as_dict()
688 1
        status = 200
689
690 1
        log.debug("update_schedule result %s %s", result, status)
691 1
        return JSONResponse(result, status_code=status)
692
693 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
694 1
    def delete_schedule(self, request: Request) -> JSONResponse:
695
        """Remove a circuit schedule.
696
697
        Remove the Schedule from EVC.
698
        Remove the Schedule from cron job.
699
        Save the EVC to the Storehouse.
700
        """
701 1
        schedule_id = request.path_params["schedule_id"]
702 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
703 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
704
705
        # Can not modify circuits deleted and archived
706 1
        if not found_schedule:
707 1
            result = f"schedule_id {schedule_id} not found"
708 1
            log.debug("delete_schedule result %s %s", result, 404)
709 1
            raise HTTPException(404, detail=result)
710
711
        # Remove the old schedule
712 1
        evc.circuit_scheduler.remove(found_schedule)
713
714
        # Cancel all schedule jobs
715 1
        self.sched.cancel_job(found_schedule.id)
716
        # Save EVC to mongodb
717 1
        evc.sync()
718
719 1
        result = "Schedule removed"
720 1
        status = 200
721
722 1
        log.debug("delete_schedule result %s %s", result, status)
723 1
        return JSONResponse(result, status_code=status)
724
725 1
    def _check_no_tag_duplication(
726
        self,
727
        evc_id: str,
728
        uni_a: Optional[UNI] = None,
729
        uni_z: Optional[UNI] = None
730
    ):
731
        """Check if the given EVC has UNIs with no tag and if these are
732
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
733
        Args:
734
            evc (dict): EVC to be analyzed.
735
        """
736
737
        # No UNIs
738 1
        if not (uni_a or uni_z):
739 1
            return
740
741 1
        if (not (uni_a and not uni_a.user_tag) and
742
                not (uni_z and not uni_z.user_tag)):
743 1
            return
744 1
        for circuit in self.circuits.copy().values():
745 1
            if (not circuit.archived and circuit._id != evc_id):
746 1
                if uni_a and uni_a.user_tag is None:
747 1
                    circuit.check_no_tag_duplicate(uni_a)
748 1
                if uni_z and uni_z.user_tag is None:
749 1
                    circuit.check_no_tag_duplicate(uni_z)
750
751 1
    @listen_to("kytos/topology.link_up")
752 1
    def on_link_up(self, event):
753
        """Change circuit when link is up or end_maintenance."""
754
        self.handle_link_up(event)
755
756 1
    def handle_link_up(self, event):
757
        """Change circuit when link is up or end_maintenance."""
758 1
        log.info("Event handle_link_up %s", event.content["link"])
759 1
        for evc in self.get_evcs_by_svc_level():
760 1
            if evc.is_enabled() and not evc.archived:
761 1
                with evc.lock:
762 1
                    evc.handle_link_up(event.content["link"])
763
764
    # Possibly replace this with interruptions?
765 1
    @listen_to(
766
        '.*.switch.interface.(link_up|link_down|created|deleted)'
767
    )
768 1
    def on_interface_link_change(self, event: KytosEvent):
769
        """
770
        Handler for interface link_up and link_down events.
771
        """
772
        self.handle_on_interface_link_change(event)
773
774 1
    def handle_on_interface_link_change(self, event: KytosEvent):
775
        """
776
        Handler to sort interface events {link_(up, down), create, deleted}
777
778
        To avoid multiple database updated (link flap):
779
        Every interface is identfied and processed in parallel.
780
        Once an interface event is received a time is started.
781
        While time is running self._intf_events will be updated.
782
        After time has passed last received event will be processed.
783
        """
784 1
        iface = event.content.get("interface")
785 1
        with self._lock_interfaces[iface.id]:
786 1
            _now = event.timestamp
787
            # Return out of order events
788 1
            if (
789
                iface.id in self._intf_events
790
                and self._intf_events[iface.id]["event"].timestamp > _now
791
            ):
792 1
                return
793 1
            self._intf_events[iface.id].update({"event": event})
794 1
            if "last_acquired" in self._intf_events[iface.id]:
795 1
                return
796 1
            self._intf_events[iface.id].update({"last_acquired": now()})
797 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
798 1
        with self._lock_interfaces[iface.id]:
799 1
            event = self._intf_events[iface.id]["event"]
800 1
            self._intf_events[iface.id].pop('last_acquired', None)
801 1
            _, _, event_type = event.name.rpartition('.')
802 1
            if event_type in ('link_up', 'created'):
803 1
                self.handle_interface_link_up(iface)
804 1
            elif event_type in ('link_down', 'deleted'):
805 1
                self.handle_interface_link_down(iface)
806
807 1
    def handle_interface_link_up(self, interface):
808
        """
809
        Handler for interface link_up events
810
        """
811
        log.info("Event handle_interface_link_up %s", interface)
812
        for evc in self.get_evcs_by_svc_level():
813
            with evc.lock:
814
                evc.handle_interface_link_up(
815
                    interface
816
                )
817
818 1
    def handle_interface_link_down(self, interface):
819
        """
820
        Handler for interface link_down events
821
        """
822
        log.info("Event handle_interface_link_down %s", interface)
823
        for evc in self.get_evcs_by_svc_level():
824
            with evc.lock:
825
                evc.handle_interface_link_down(
826
                    interface
827
                )
828
829 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
830 1
    def on_link_down(self, event):
831
        """Change circuit when link is down or under_mantenance."""
832
        self.handle_link_down(event)
833
834
    # pylint: disable=too-many-branches
835
    # pylint: disable=too-many-locals
836 1
    def handle_link_down(self, event):
837
        """Change circuit when link is down or under_mantenance."""
838 1
        link = event.content["link"]
839 1
        log.info("Event handle_link_down %s", link)
840 1
        switch_flows = {}
841 1
        evcs_with_failover = []
842 1
        evcs_normal = []
843 1
        check_failover = []
844 1
        failover_event_contents = {}
845
846 1
        for evc in self.get_evcs_by_svc_level():
847 1
            with evc.lock:
848 1
                if evc.is_affected_by_link(link):
849 1
                    evc.affected_by_link_at = event.timestamp
850
                    # if there is no failover path, handles link down the
851
                    # tradditional way
852 1
                    if (
853
                        not evc.failover_path or
854
                        evc.is_failover_path_affected_by_link(link)
855
                    ):
856 1
                        evcs_normal.append(evc)
857 1
                        continue
858 1
                    try:
859 1
                        dpid_flows = evc.get_failover_flows()
860 1
                        evc.old_path = evc.current_path
861 1
                        evc.current_path = evc.failover_path
862 1
                        evc.failover_path = Path([])
863
                    # pylint: disable=broad-except
864 1
                    except Exception:
865 1
                        err = traceback.format_exc().replace("\n", ", ")
866 1
                        log.error(
867
                            "Ignore Failover path for "
868
                            f"{evc} due to error: {err}"
869
                        )
870 1
                        evcs_normal.append(evc)
871 1
                        continue
872 1
                    for dpid, flows in dpid_flows.items():
873 1
                        switch_flows.setdefault(dpid, [])
874 1
                        switch_flows[dpid].extend(flows)
875 1
                    evcs_with_failover.append(evc)
876 1
                    failover_event_contents[evc.id] = map_evc_event_content(
877
                        evc,
878
                        flows={k: v.copy() for k, v in switch_flows.items()}
879
                    )
880 1
                elif evc.is_failover_path_affected_by_link(link):
881 1
                    evc.old_path = evc.failover_path
882 1
                    evc.failover_path = Path([])
883 1
                    check_failover.append(evc)
884
885 1
        if failover_event_contents:
886 1
            emit_event(self.controller, "failover_link_down",
887
                       content=deepcopy(failover_event_contents))
888 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
889
890 1
        for evc in evcs_normal:
891 1
            emit_event(
892
                self.controller,
893
                "evc_affected_by_link_down",
894
                content={"link": link} | map_evc_event_content(evc)
895
            )
896
897 1
        evcs_to_update = []
898 1
        for evc in evcs_with_failover:
899 1
            evcs_to_update.append(evc.as_dict())
900 1
            log.info(
901
                f"{evc} redeployed with failover due to link down {link.id}"
902
            )
903 1
        for evc in check_failover:
904 1
            evcs_to_update.append(evc.as_dict())
905
906 1
        self.mongo_controller.update_evcs(evcs_to_update)
907
908 1
        emit_event(
909
            self.controller,
910
            "cleanup_evcs_old_path",
911
            content={"evcs": evcs_with_failover + check_failover}
912
        )
913
914 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
915 1
    def on_evc_affected_by_link_down(self, event):
916
        """Change circuit when link is down or under_mantenance."""
917
        self.handle_evc_affected_by_link_down(event)
918
919 1
    def handle_evc_affected_by_link_down(self, event):
920
        """Change circuit when link is down or under_mantenance."""
921 1
        evc = self.circuits.get(event.content["evc_id"])
922 1
        link = event.content['link']
923 1
        if not evc:
924 1
            return
925 1
        with evc.lock:
926 1
            if not evc.is_affected_by_link(link):
927
                return
928 1
            result = evc.handle_link_down()
929 1
        event_name = "error_redeploy_link_down"
930 1
        if result:
931 1
            log.info(f"{evc} redeployed due to link down {link.id}")
932 1
            event_name = "redeployed_link_down"
933 1
        emit_event(self.controller, event_name,
934
                   content=map_evc_event_content(evc))
935
936 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
937 1
    def on_evc_deployed(self, event):
938
        """Handle EVC deployed|redeployed_link_down."""
939
        self.handle_evc_deployed(event)
940
941 1
    def handle_evc_deployed(self, event):
942
        """Setup failover path on evc deployed."""
943
        evc = self.circuits.get(event.content["evc_id"])
944
        if not evc:
945
            return
946
        evc.try_setup_failover_path()
947
948 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
949 1
    def on_cleanup_evcs_old_path(self, event):
950
        """Handle cleanup evcs old path."""
951
        self.handle_cleanup_evcs_old_path(event)
952
953 1
    def handle_cleanup_evcs_old_path(self, event):
954
        """Handle cleanup evcs old path."""
955 1
        evcs = event.content.get("evcs", [])
956 1
        event_contents: dict[str, dict] = defaultdict(list)
957 1
        total_flows = {}
958 1
        for evc in evcs:
959 1
            if not evc.old_path:
960 1
                continue
961 1
            with evc.lock:
962 1
                removed_flows = {}
963 1
                try:
964 1
                    nni_flows = prepare_delete_flow(
965
                        evc._prepare_nni_flows(evc.old_path)
966
                    )
967 1
                    uni_flows = prepare_delete_flow(
968
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
969
                    )
970 1
                    removed_flows = merge_flow_dicts(
971
                        nni_flows, uni_flows
972
                    )
973
                # pylint: disable=broad-except
974
                except Exception:
975
                    err = traceback.format_exc().replace("\n", ", ")
976
                    log.error(f"Fail to remove {evc} old_path: {err}")
977
                    continue
978 1
                if removed_flows:
979 1
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
980 1
                    content = map_evc_event_content(
981
                        evc,
982
                        removed_flows=deepcopy(removed_flows),
983
                        current_path=evc.current_path.as_dict(),
984
                    )
985 1
                    event_contents[evc.id] = content
986 1
                    evc.old_path = Path([])
987 1
        if event_contents:
988 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
989 1
            emit_event(self.controller, "failover_old_path",
990
                       content=event_contents)
991
992 1
    @listen_to("kytos/topology.topology_loaded")
993 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
994
        """Load EVCs once the topology is available."""
995
        self.load_all_evcs()
996
997 1
    def load_all_evcs(self):
998
        """Try to load all EVCs on startup."""
999 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1000 1
        for circuit_id, circuit in circuits:
1001 1
            if circuit_id not in self.circuits:
1002 1
                self._load_evc(circuit)
1003 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1004
                   timeout=1)
1005
1006 1
    def _load_evc(self, circuit_dict):
1007
        """Load one EVC from mongodb to memory."""
1008 1
        try:
1009 1
            evc = self._evc_from_dict(circuit_dict)
1010 1
        except (ValueError, KytosTagError) as exception:
1011 1
            log.error(
1012
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1013
            )
1014 1
            return None
1015 1
        if evc.archived:
1016 1
            return None
1017
1018 1
        self.circuits.setdefault(evc.id, evc)
1019 1
        self.sched.add(evc)
1020 1
        return evc
1021
1022 1
    @listen_to("kytos/flow_manager.flow.error")
1023 1
    def on_flow_mod_error(self, event):
1024
        """Handle flow mod errors related to an EVC."""
1025
        self.handle_flow_mod_error(event)
1026
1027 1
    def handle_flow_mod_error(self, event):
1028
        """Handle flow mod errors related to an EVC."""
1029 1
        flow = event.content["flow"]
1030 1
        command = event.content.get("error_command")
1031 1
        if command != "add":
1032
            return
1033 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1034 1
        if evc:
1035 1
            with evc.lock:
1036 1
                evc.remove_current_flows(sync=False)
1037 1
                evc.remove_failover_flows(sync=True)
1038
1039 1
    def _evc_dict_with_instances(self, evc_dict):
1040
        """Convert some dict values to instance of EVC classes.
1041
1042
        This method will convert: [UNI, Link]
1043
        """
1044 1
        data = evc_dict.copy()  # Do not modify the original dict
1045 1
        for attribute, value in data.items():
1046
            # Get multiple attributes.
1047
            # Ex: uni_a, uni_z
1048 1
            if "uni" in attribute:
1049 1
                try:
1050 1
                    data[attribute] = self._uni_from_dict(value)
1051 1
                except ValueError as exception:
1052 1
                    result = "Error creating UNI: Invalid value"
1053 1
                    raise ValueError(result) from exception
1054
1055 1
            if attribute == "circuit_scheduler":
1056 1
                data[attribute] = []
1057 1
                for schedule in value:
1058 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1059
1060
            # Get multiple attributes.
1061
            # Ex: primary_links,
1062
            #     backup_links,
1063
            #     current_links_cache,
1064
            #     primary_links_cache,
1065
            #     backup_links_cache
1066 1
            if "links" in attribute:
1067 1
                data[attribute] = [
1068
                    self._link_from_dict(link, attribute) for link in value
1069
                ]
1070
1071
            # Ex: current_path,
1072
            #     primary_path,
1073
            #     backup_path
1074 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1075 1
                data[attribute] = Path(
1076
                    [self._link_from_dict(link, attribute) for link in value]
1077
                )
1078
1079 1
        return data
1080
1081 1
    def _evc_from_dict(self, evc_dict):
1082 1
        data = self._evc_dict_with_instances(evc_dict)
1083 1
        data["table_group"] = self.table_group
1084 1
        return EVC(self.controller, **data)
1085
1086 1
    def _uni_from_dict(self, uni_dict):
1087
        """Return a UNI object from python dict."""
1088 1
        if uni_dict is None:
1089 1
            return False
1090
1091 1
        interface_id = uni_dict.get("interface_id")
1092 1
        interface = self.controller.get_interface_by_id(interface_id)
1093 1
        if interface is None:
1094 1
            result = (
1095
                "Error creating UNI:"
1096
                + f"Could not instantiate interface {interface_id}"
1097
            )
1098 1
            raise ValueError(result) from ValueError
1099 1
        tag_convert = {1: "vlan"}
1100 1
        tag_dict = uni_dict.get("tag", None)
1101 1
        if tag_dict:
1102 1
            tag_type = tag_dict.get("tag_type")
1103 1
            tag_type = tag_convert.get(tag_type, tag_type)
1104 1
            tag_value = tag_dict.get("value")
1105 1
            if isinstance(tag_value, list):
1106 1
                tag_value = get_tag_ranges(tag_value)
1107 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1108 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1109
            else:
1110 1
                tag = TAG(tag_type, tag_value)
1111
        else:
1112 1
            tag = None
1113 1
        uni = UNI(interface, tag)
1114 1
        return uni
1115
1116 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1117
        """Return a Link object from python dict."""
1118 1
        id_a = link_dict.get("endpoint_a").get("id")
1119 1
        id_b = link_dict.get("endpoint_b").get("id")
1120
1121 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1122 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1123 1
        if not endpoint_a:
1124 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1125 1
            raise ValueError(error_msg)
1126 1
        if not endpoint_b:
1127
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1128
            raise ValueError(error_msg)
1129
1130 1
        link = Link(endpoint_a, endpoint_b)
1131 1
        allowed_paths = {"current_path", "failover_path"}
1132 1
        if "metadata" in link_dict and attribute in allowed_paths:
1133 1
            link.extend_metadata(link_dict.get("metadata"))
1134
1135 1
        s_vlan = link.get_metadata("s_vlan")
1136 1
        if s_vlan:
1137 1
            tag = TAG.from_dict(s_vlan)
1138 1
            if tag is False:
1139
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1140
                raise ValueError(error_msg)
1141 1
            link.update_metadata("s_vlan", tag)
1142 1
        return link
1143
1144 1
    def _find_evc_by_schedule_id(self, schedule_id):
1145
        """
1146
        Find an EVC and CircuitSchedule based on schedule_id.
1147
1148
        :param schedule_id: Schedule ID
1149
        :return: EVC and Schedule
1150
        """
1151 1
        circuits = self._get_circuits_buffer()
1152 1
        found_schedule = None
1153 1
        evc = None
1154
1155
        # pylint: disable=unused-variable
1156 1
        for c_id, circuit in circuits.items():
1157 1
            for schedule in circuit.circuit_scheduler:
1158 1
                if schedule.id == schedule_id:
1159 1
                    found_schedule = schedule
1160 1
                    evc = circuit
1161 1
                    break
1162 1
            if found_schedule:
1163 1
                break
1164 1
        return evc, found_schedule
1165
1166 1
    def _get_circuits_buffer(self):
1167
        """
1168
        Return the circuit buffer.
1169
1170
        If the buffer is empty, try to load data from mongodb.
1171
        """
1172 1
        if not self.circuits:
1173
            # Load circuits from mongodb to buffer
1174 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1175 1
            for c_id, circuit in circuits.items():
1176 1
                evc = self._evc_from_dict(circuit)
1177 1
                self.circuits[c_id] = evc
1178 1
        return self.circuits
1179
1180
    # pylint: disable=attribute-defined-outside-init
1181 1
    @alisten_to("kytos/of_multi_table.enable_table")
1182 1
    async def on_table_enabled(self, event):
1183
        """Handle a recently table enabled."""
1184 1
        table_group = event.content.get("mef_eline", None)
1185 1
        if not table_group:
1186 1
            return
1187 1
        for group in table_group:
1188 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1189 1
                log.error(f'The table group "{group}" is not allowed for '
1190
                          f'mef_eline. Allowed table groups are '
1191
                          f'{settings.TABLE_GROUP_ALLOWED}')
1192 1
                return
1193 1
        self.table_group.update(table_group)
1194 1
        content = {"group_table": self.table_group}
1195 1
        name = "kytos/mef_eline.enable_table"
1196
        await aemit_event(self.controller, name, content)
1197