Test Failed
Pull Request — master (#424)
by
unknown
04:01
created

build.main.Main.on_flow_mod_error()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11
from typing import Optional
12 1
13
from pydantic import ValidationError
14 1
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19 1
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23 1
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27 1
                                              DuplicatedNoTagUNI, InvalidPath)
28
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29 1
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36 1
# pylint: disable=too-many-public-methods
37
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42 1
43
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44 1
45
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53 1
        # object used to scheduler circuit events
54
        self.sched = Scheduler()
55
56 1
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58
        self.mongo_controller.bootstrap_indexes()
59
60 1
        # set the controller that will manager the dynamic paths
61
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64 1
        # Every create/update/delete must be synced to mongodb.
65
        self.circuits = {}
66 1
67 1
        self._intf_events = defaultdict(dict)
68 1
        self.lock_interfaces = defaultdict(Lock)
69
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74
        self._topology_updated_at = None
75
76
    def get_evcs_by_svc_level(self) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78 1
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        return sorted(self.circuits.values(),
82 1
                      key=lambda x: (-x.service_level, x.creation_time))
83
84
    @staticmethod
85
    def get_eline_controller():
86 1
        """Return the ELineController instance."""
87
        return controllers.ELineController()
88 1
89 1
    def execute(self):
90 1
        """Execute once when the napp is running."""
91 1
        if self._lock.locked():
92 1
            return
93 1
        log.debug("Starting consistency routine")
94
        with self._lock:
95 1
            self.execute_consistency()
96
        log.debug("Finished consistency routine")
97
98 1
    def should_be_checked(self, circuit):
99
        "Verify if the circuit meets the necessary conditions to be checked"
100
        # pylint: disable=too-many-boolean-expressions
101
        if (
102
                circuit.is_enabled()
103
                and not circuit.is_active()
104
                and not circuit.lock.locked()
105
                and not circuit.has_recent_removed_flow()
106
                and not circuit.is_recent_updated()
107
                and circuit.are_unis_active(self.controller.switches)
108
                # if a inter-switch EVC does not have current_path, it does not
109 1
                # make sense to run sdntrace on it
110
                and (circuit.is_intra_switch() or circuit.current_path)
111
                ):
112 1
            return True
113
        return False
114 1
115 1
    def execute_consistency(self):
116 1
        """Execute consistency routine."""
117 1
        circuits_to_check = []
118 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
119 1
        for circuit in self.get_evcs_by_svc_level():
120 1
            stored_circuits.pop(circuit.id, None)
121 1
            if self.should_be_checked(circuit):
122 1
                circuits_to_check.append(circuit)
123 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
124 1
        for circuit in circuits_to_check:
125 1
            is_checked = circuits_checked.get(circuit.id)
126 1
            if is_checked:
127 1
                circuit.execution_rounds = 0
128 1
                log.info(f"{circuit} enabled but inactive - activating")
129
                with circuit.lock:
130 1
                    circuit.activate()
131 1
                    circuit.sync()
132 1
            else:
133 1
                circuit.execution_rounds += 1
134 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
135 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
136 1
                    with circuit.lock:
137 1
                        circuit.deploy()
138
        for circuit_id in stored_circuits:
139 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
140
            self._load_evc(stored_circuits[circuit_id])
141
142
    def shutdown(self):
143
        """Execute when your napp is unloaded.
144
145 1
        If you have some cleanup procedure, insert it here.
146 1
        """
147
148
    @rest("/v2/evc/", methods=["GET"])
149
    def list_circuits(self, request: Request) -> JSONResponse:
150
        """Endpoint to return circuits stored.
151
152 1
        archive query arg if defined (not null) will be filtered
153 1
        accordingly, by default only non archived evcs will be listed
154 1
        """
155 1
        log.debug("list_circuits /v2/evc")
156 1
        args = request.query_params
157
        archived = args.get("archived", "false").lower()
158 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
159 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
160
                                                      metadata=args)
161 1
        circuits = circuits['circuits']
162 1
        return JSONResponse(circuits)
163
164
    @rest("/v2/evc/schedule", methods=["GET"])
165
    def list_schedules(self, _request: Request) -> JSONResponse:
166
        """Endpoint to return all schedules stored for all circuits.
167
168
        Return a JSON with the following template:
169
        [{"schedule_id": <schedule_id>,
170 1
         "circuit_id": <circuit_id>,
171 1
         "schedule": <schedule object>}]
172 1
        """
173 1
        log.debug("list_schedules /v2/evc/schedule")
174 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
175 1
        if not circuits:
176
            result = {}
177 1
            status = 200
178 1
            return JSONResponse(result, status_code=status)
179 1
180 1
        result = []
181 1
        status = 200
182 1
        for circuit in circuits:
183 1
            circuit_scheduler = circuit.get("circuit_scheduler")
184
            if circuit_scheduler:
185
                for scheduler in circuit_scheduler:
186
                    value = {
187
                        "schedule_id": scheduler.get("id"),
188 1
                        "circuit_id": circuit.get("id"),
189
                        "schedule": scheduler,
190 1
                    }
191 1
                    result.append(value)
192
193 1
        log.debug("list_schedules result %s %s", result, status)
194 1
        return JSONResponse(result, status_code=status)
195
196 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
197 1
    def get_circuit(self, request: Request) -> JSONResponse:
198 1
        """Endpoint to return a circuit based on id."""
199 1
        circuit_id = request.path_params["circuit_id"]
200 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
201 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
202 1
        if not circuit:
203 1
            result = f"circuit_id {circuit_id} not found"
204 1
            log.debug("get_circuit result %s %s", result, 404)
205 1
            raise HTTPException(404, detail=result)
206
        status = 200
207
        log.debug("get_circuit result %s %s", circuit, status)
208 1
        return JSONResponse(circuit, status_code=status)
209 1
210 1
    # pylint: disable=too-many-branches, too-many-statements
211
    @rest("/v2/evc/", methods=["POST"])
212
    @validate_openapi(spec)
213
    def create_circuit(self, request: Request) -> JSONResponse:
214
        """Try to create a new circuit.
215
216
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
217
        UNI_Z's requested C-VID are available from the interfaces' pools. This
218
        is checked when creating the UNI object.
219
220
        Then, E-Line NApp requests a primary and a backup path to the
221
        Pathfinder NApp using the attributes primary_links and backup_links
222
        submitted via REST
223
224
        # For each link composing paths in #3:
225
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
226
        #  - Using the S-VID obtained, generate abstract flow entries to be
227
        #    sent to FlowManager
228
229
        Push abstract flow entries to FlowManager and FlowManager pushes
230
        OpenFlow entries to datapaths
231
232
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
233
        creation
234
235 1
        Finnaly, notify user of the status of its request.
236 1
        """
237
        # Try to create the circuit object
238 1
        log.debug("create_circuit /v2/evc/")
239 1
        data = get_json_or_400(request, self.controller.loop)
240 1
241 1
        try:
242 1
            evc = self._evc_from_dict(data)
243 1
        except (ValueError, KytosTagError) as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 400)
245 1
            raise HTTPException(400, detail=str(exception)) from exception
246 1
        try:
247 1
            check_disabled_component(evc.uni_a, evc.uni_z)
248
        except DisabledSwitch as exception:
249
            log.debug("create_circuit result %s %s", exception, 409)
250
            raise HTTPException(
251
                    409,
252 1
                    detail=f"Path is not valid: {exception}"
253 1
                ) from exception
254 1
255
        if evc.primary_path:
256
            try:
257
                evc.primary_path.is_valid(
258
                    evc.uni_a.interface.switch,
259 1
                    evc.uni_z.interface.switch,
260 1
                    bool(evc.circuit_scheduler),
261
                )
262
            except InvalidPath as exception:
263
                raise HTTPException(
264 1
                    400,
265 1
                    detail=f"primary_path is not valid: {exception}"
266 1
                ) from exception
267
        if evc.backup_path:
268
            try:
269
                evc.backup_path.is_valid(
270
                    evc.uni_a.interface.switch,
271 1
                    evc.uni_z.interface.switch,
272 1
                    bool(evc.circuit_scheduler),
273
                )
274
            except InvalidPath as exception:
275
                raise HTTPException(
276
                    400,
277 1
                    detail=f"backup_path is not valid: {exception}"
278 1
                ) from exception
279 1
280
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282 1
            raise HTTPException(400, detail=detail)
283 1
284 1
        try:
285
            evc._validate_has_primary_or_dynamic()
286 1
        except ValueError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289
        try:
290
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
291
        except DuplicatedNoTagUNI as exception:
292 1
            log.debug("create_circuit result %s %s", exception, 409)
293 1
            raise HTTPException(409, detail=str(exception)) from exception
294 1
295 1
        try:
296
            self._use_uni_tags(evc)
297
        except KytosTagError as exception:
298 1
            raise HTTPException(400, detail=str(exception)) from exception
299 1
300
        # save circuit
301
        try:
302
            evc.sync()
303
        except ValidationError as exception:
304 1
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # store circuit in dictionary
307 1
        self.circuits[evc.id] = evc
308
309
        # Schedule the circuit deploy
310 1
        self.sched.add(evc)
311 1
312 1
        # Circuit has no schedule, deploy now
313
        if not evc.circuit_scheduler:
314
            with evc.lock:
315 1
                evc.deploy()
316 1
317 1
        # Notify users
318 1
        result = {"circuit_id": evc.id}
319
        status = 201
320 1
        log.debug("create_circuit result %s %s", result, status)
321
        emit_event(self.controller, name="created",
322 1
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324 1
325 1
    @staticmethod
326 1
    def _use_uni_tags(evc):
327 1
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334 1
            raise err
335
336
    @listen_to('kytos/flow_manager.flow.removed')
337
    def on_flow_delete(self, event):
338 1
        """Capture delete messages to keep track when flows got removed."""
339
        self.handle_flow_delete(event)
340 1
341 1
    def handle_flow_delete(self, event):
342 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348 1
349
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350
    @validate_openapi(spec)
351
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353
354 1
        The EVC attributes (creation_time, active, current_path,
355 1
        failover_path, _id, archived) can't be updated.
356 1
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358 1
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362 1
        except KeyError:
363
            result = f"circuit_id {circuit_id} not found"
364 1
            log.debug("update result %s %s", result, 404)
365 1
            raise HTTPException(404, detail=result) from KeyError
366 1
367 1
        if evc.archived:
368
            result = "Can't update archived EVC"
369 1
            log.debug("update result %s %s", result, 409)
370 1
            raise HTTPException(409, detail=result)
371 1
372
        try:
373
            updated_data = self._evc_dict_with_instances(data)
374
            self._check_no_tag_duplication(
375 1
                circuit_id, updated_data.get("uni_a"),
376 1
                updated_data.get("uni_z")
377 1
            )
378 1
            enable, redeploy = evc.update(**updated_data)
379 1
        except (ValueError, KytosTagError, ValidationError) as exception:
380
            log.debug("update result %s %s", exception, 400)
381
            raise HTTPException(400, detail=str(exception)) from exception
382 1
        except DuplicatedNoTagUNI as exception:
383 1
            log.debug("update result %s %s", exception, 409)
384 1
            raise HTTPException(409, detail=str(exception)) from exception
385
        except DisabledSwitch as exception:
386
            log.debug("update result %s %s", exception, 409)
387
            raise HTTPException(
388
                    409,
389 1
                    detail=f"Path is not valid: {exception}"
390
                ) from exception
391
392
        if evc.is_active():
393
            if enable is False:  # disable if active
394
                with evc.lock:
395
                    evc.remove()
396
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398 1
                    evc.remove()
399 1
                    evc.deploy()
400 1
        else:
401 1
            if enable is True:  # enable if inactive
402 1
                with evc.lock:
403 1
                    evc.deploy()
404 1
            elif evc.is_enabled() and redeploy:
405 1
                with evc.lock:
406 1
                    evc.remove()
407
                    evc.deploy()
408 1
        result = {evc.id: evc.as_dict()}
409 1
        status = 200
410
411 1
        log.debug("update result %s %s", result, status)
412
        emit_event(self.controller, "updated",
413 1
                   content=map_evc_event_content(evc, **data))
414 1
        return JSONResponse(result, status_code=status)
415
416
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
417
    def delete_circuit(self, request: Request) -> JSONResponse:
418
        """Remove a circuit.
419
420 1
        First, the flows are removed from the switches, and then the EVC is
421 1
        disabled.
422 1
        """
423 1
        circuit_id = request.path_params["circuit_id"]
424 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
425 1
        try:
426 1
            evc = self.circuits[circuit_id]
427 1
        except KeyError:
428
            result = f"circuit_id {circuit_id} not found"
429 1
            log.debug("delete_circuit result %s %s", result, 404)
430 1
            raise HTTPException(404, detail=result) from KeyError
431 1
432 1
        if evc.archived:
433
            result = f"Circuit {circuit_id} already removed"
434 1
            log.debug("delete_circuit result %s %s", result, 404)
435 1
            raise HTTPException(404, detail=result)
436 1
437 1
        log.info("Removing %s", evc)
438 1
        with evc.lock:
439 1
            evc.remove_current_flows()
440 1
            evc.remove_failover_flows(sync=False)
441 1
            evc.deactivate()
442 1
            evc.disable()
443 1
            self.sched.remove(evc)
444 1
            evc.archive()
445 1
            evc.remove_uni_tags()
446 1
            evc.sync()
447
        log.info("EVC removed. %s", evc)
448 1
        result = {"response": f"Circuit {circuit_id} removed"}
449 1
        status = 200
450
451 1
        log.debug("delete_circuit result %s %s", result, status)
452
        emit_event(self.controller, "deleted",
453 1
                   content=map_evc_event_content(evc))
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458 1
        """Get metadata from an EVC."""
459
        circuit_id = request.path_params["circuit_id"]
460
        try:
461
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467 1
                detail=f"circuit_id {circuit_id} not found."
468 1
            ) from error
469 1
470 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475
        circuit_ids = data.pop("circuit_ids")
476 1
477 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
478 1
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486 1
487
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490 1
491
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494 1
        """Add metadata to an EVC."""
495
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498 1
            raise HTTPException(400, "Invalid metadata value: {metadata}")
499 1
        try:
500
            evc = self.circuits[circuit_id]
501
        except KeyError as error:
502
            raise HTTPException(
503
                404,
504 1
                detail=f"circuit_id {circuit_id} not found."
505 1
            ) from error
506 1
507
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510 1
511 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514 1
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
519 1
520 1
        fail_evcs = []
521 1
        for _id in circuit_ids:
522 1
            try:
523 1
                evc = self.circuits[_id]
524
                evc.remove_metadata(key)
525 1
            except KeyError:
526 1
                fail_evcs.append(_id)
527 1
528
        if fail_evcs:
529 1
            raise HTTPException(404, detail=fail_evcs)
530 1
        return JSONResponse("Operation successful")
531
532 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
533 1
    def delete_metadata(self, request: Request) -> JSONResponse:
534 1
        """Delete metadata from an EVC."""
535 1
        circuit_id = request.path_params["circuit_id"]
536 1
        key = request.path_params["key"]
537 1
        try:
538
            evc = self.circuits[circuit_id]
539
        except KeyError as error:
540
            raise HTTPException(
541
                404,
542 1
                detail=f"circuit_id {circuit_id} not found."
543 1
            ) from error
544 1
545
        evc.remove_metadata(key)
546 1
        evc.sync()
547 1
        return JSONResponse("Operation successful")
548
549 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
550 1
    def redeploy(self, request: Request) -> JSONResponse:
551 1
        """Endpoint to force the redeployment of an EVC."""
552 1
        circuit_id = request.path_params["circuit_id"]
553 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
554 1
        try:
555
            evc = self.circuits[circuit_id]
556
        except KeyError:
557
            raise HTTPException(
558 1
                404,
559 1
                detail=f"circuit_id {circuit_id} not found"
560 1
            ) from KeyError
561 1
        if evc.is_enabled():
562 1
            with evc.lock:
563 1
                evc.remove_current_flows()
564
                evc.deploy()
565 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
566 1
            status = 202
567
        else:
568 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
569
            status = 409
570 1
571 1
        return JSONResponse(result, status_code=status)
572 1
573
    @rest("/v2/evc/schedule/", methods=["POST"])
574
    @validate_openapi(spec)
575
    def create_schedule(self, request: Request) -> JSONResponse:
576
        """
577
        Create a new schedule for a given circuit.
578
579
        This service do no check if there are conflicts with another schedule.
580
        Payload example:
581
            {
582
              "circuit_id":"aa:bb:cc",
583
              "schedule": {
584
                "date": "2019-08-07T14:52:10.967Z",
585
                "interval": "string",
586
                "frequency": "1 * * * *",
587
                "action": "create"
588 1
              }
589 1
            }
590 1
        """
591 1
        log.debug("create_schedule /v2/evc/schedule/")
592
        data = get_json_or_400(request, self.controller.loop)
593
        circuit_id = data["circuit_id"]
594 1
        schedule_data = data["schedule"]
595
596
        # Get EVC from circuits buffer
597 1
        circuits = self._get_circuits_buffer()
598
599
        # get the circuit
600 1
        evc = circuits.get(circuit_id)
601 1
602 1
        # get the circuit
603 1
        if not evc:
604
            result = f"circuit_id {circuit_id} not found"
605 1
            log.debug("create_schedule result %s %s", result, 404)
606 1
            raise HTTPException(404, detail=result)
607 1
        # Can not modify circuits deleted and archived
608 1
        if evc.archived:
609
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
610
            log.debug("create_schedule result %s %s", result, 409)
611 1
            raise HTTPException(409, detail=result)
612
613
        # new schedule from dict
614 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
615 1
616
        # If there is no schedule, create the list
617
        if not evc.circuit_scheduler:
618 1
            evc.circuit_scheduler = []
619
620
        # Add the new schedule
621 1
        evc.circuit_scheduler.append(new_schedule)
622
623
        # Add schedule job
624 1
        self.sched.add_circuit_job(evc, new_schedule)
625
626 1
        # save circuit to mongodb
627 1
        evc.sync()
628
629 1
        result = new_schedule.as_dict()
630 1
        status = 201
631
632 1
        log.debug("create_schedule result %s %s", result, status)
633 1
        return JSONResponse(result, status_code=status)
634 1
635
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
636
    @validate_openapi(spec)
637
    def update_schedule(self, request: Request) -> JSONResponse:
638
        """Update a schedule.
639
640
        Change all attributes from the given schedule from a EVC circuit.
641
        The schedule ID is preserved as default.
642
        Payload example:
643
            {
644
              "date": "2019-08-07T14:52:10.967Z",
645
              "interval": "string",
646
              "frequency": "1 * * *",
647 1
              "action": "create"
648 1
            }
649 1
        """
650
        data = get_json_or_400(request, self.controller.loop)
651
        schedule_id = request.path_params["schedule_id"]
652 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
653
654
        # Try to find a circuit schedule
655 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
656 1
657 1
        # Can not modify circuits deleted and archived
658 1
        if not found_schedule:
659 1
            result = f"schedule_id {schedule_id} not found"
660 1
            log.debug("update_schedule result %s %s", result, 404)
661 1
            raise HTTPException(404, detail=result)
662 1
        if evc.archived:
663
            result = f"Circuit {evc.id} is archived. Update is forbidden."
664 1
            log.debug("update_schedule result %s %s", result, 409)
665 1
            raise HTTPException(409, detail=result)
666
667 1
        new_schedule = CircuitSchedule.from_dict(data)
668
        new_schedule.id = found_schedule.id
669 1
        # Remove the old schedule
670
        evc.circuit_scheduler.remove(found_schedule)
671
        # Append the modified schedule
672 1
        evc.circuit_scheduler.append(new_schedule)
673
674 1
        # Cancel all schedule jobs
675
        self.sched.cancel_job(found_schedule.id)
676 1
        # Add the new circuit schedule
677
        self.sched.add_circuit_job(evc, new_schedule)
678 1
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = new_schedule.as_dict()
682 1
        status = 200
683
684 1
        log.debug("update_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
688
    def delete_schedule(self, request: Request) -> JSONResponse:
689
        """Remove a circuit schedule.
690
691
        Remove the Schedule from EVC.
692 1
        Remove the Schedule from cron job.
693 1
        Save the EVC to the Storehouse.
694 1
        """
695
        schedule_id = request.path_params["schedule_id"]
696
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
697 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
698 1
699 1
        # Can not modify circuits deleted and archived
700 1
        if not found_schedule:
701
            result = f"schedule_id {schedule_id} not found"
702 1
            log.debug("delete_schedule result %s %s", result, 404)
703 1
            raise HTTPException(404, detail=result)
704 1
705 1
        if evc.archived:
706
            result = f"Circuit {evc.id} is archived. Update is forbidden."
707
            log.debug("delete_schedule result %s %s", result, 409)
708 1
            raise HTTPException(409, detail=result)
709
710
        # Remove the old schedule
711 1
        evc.circuit_scheduler.remove(found_schedule)
712
713 1
        # Cancel all schedule jobs
714
        self.sched.cancel_job(found_schedule.id)
715 1
        # Save EVC to mongodb
716 1
        evc.sync()
717
718 1
        result = "Schedule removed"
719 1
        status = 200
720
721 1
        log.debug("delete_schedule result %s %s", result, status)
722
        return JSONResponse(result, status_code=status)
723
724
    def _check_no_tag_duplication(
725
        self,
726
        evc_id: str,
727
        uni_a: Optional[UNI] = None,
728
        uni_z: Optional[UNI] = None
729
    ):
730
        """Check if the given EVC has UNIs with no tag and if these are
731
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
732
        Args:
733
            evc (dict): EVC to be analyzed.
734 1
        """
735 1
736
        # No UNIs
737 1
        if not (uni_a or uni_z):
738
            return
739 1
740 1
        if (not (uni_a and not uni_a.user_tag) and
741 1
                not (uni_z and not uni_z.user_tag)):
742 1
            return
743 1
        for circuit in self.circuits.copy().values():
744 1
            if (not circuit.archived and circuit._id != evc_id):
745 1
                if uni_a and uni_a.user_tag is None:
746
                    circuit.check_no_tag_duplicate(uni_a)
747 1
                if uni_z and uni_z.user_tag is None:
748 1
                    circuit.check_no_tag_duplicate(uni_z)
749
750
    @listen_to("kytos/topology.link_up")
751
    def on_link_up(self, event):
752 1
        """Change circuit when link is up or end_maintenance."""
753
        self.handle_link_up(event)
754 1
755 1
    def handle_link_up(self, event):
756 1
        """Change circuit when link is up or end_maintenance."""
757 1
        log.info("Event handle_link_up %s", event.content["link"])
758 1
        for evc in self.get_evcs_by_svc_level():
759
            if evc.is_enabled() and not evc.archived:
760
                with evc.lock:
761 1
                    evc.handle_link_up(event.content["link"])
762
763
    # Possibly replace this with interruptions?
764
    @listen_to(
765 1
        '.*.switch.interface.(link_up|link_down|created|deleted)'
766
    )
767
    def on_interface_link_change(self, event: KytosEvent):
768
        """
769
        Handler for interface link_up and link_down events.
770
        """
771
        self.handle_on_interface_link_change(event)
772
773
    def handle_on_interface_link_change(self, event: KytosEvent):
774
        """
775
        Handler to sort interface events {link_(up, down), create, deleted}
776
777 1
        To avoid multiple database updated (link flap):
778
        Every interface is identfied and processed in parallel.
779
        Once an interface event is received a time is started.
780
        While time is running self._intf_events will be updated.
781
        After time has passed last received event will be processed.
782
        """
783
        iface = event.content.get("interface")
784
        with self.lock_interfaces[iface.id]:
785
            _now = event.timestamp
786
            # Return out of order events
787 1
            if (
788
                iface.id in self._intf_events
789
                and self._intf_events[iface.id]["event"].timestamp > _now
790
            ):
791
                return
792
            self._intf_events[iface.id].update({"event": event})
793
            if "last_acquired" in self._intf_events[iface.id]:
794
                return
795
            self._intf_events[iface.id].update({"last_acquired": now()})
796
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
797 1
        with self.lock_interfaces[iface.id]:
798 1
            event = self._intf_events[iface.id]["event"]
799
            self._intf_events[iface.id].pop('last_acquired', None)
800
            _, _, event_type = event.name.rpartition('.')
801
            if event_type in ('link_up', 'created'):
802 1
                self.handle_interface_link_up(iface)
803
            elif event_type in ('link_down', 'deleted'):
804 1
                self.handle_interface_link_down(iface)
805 1
806 1
    def handle_interface_link_up(self, interface):
807 1
        """
808 1
        Handler for interface link_up events
809 1
        """
810 1
        for evc in self.get_evcs_by_svc_level():
811 1
            log.info("Event handle_interface_link_up %s", interface)
812
            evc.handle_interface_link_up(
813
                interface
814 1
            )
815
816
    def handle_interface_link_down(self, interface):
817
        """
818 1
        Handler for interface link_down events
819 1
        """
820 1
        for evc in self.get_evcs_by_svc_level():
821 1
            log.info("Event handle_interface_link_down %s", interface)
822
            evc.handle_interface_link_down(
823 1
                interface
824 1
            )
825 1
826
    @listen_to("kytos/topology.link_down")
827
    def on_link_down(self, event):
828 1
        """Change circuit when link is down or under_mantenance."""
829 1
        self.handle_link_down(event)
830 1
831 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
832 1
        """Change circuit when link is down or under_mantenance."""
833 1
        link = event.content["link"]
834
        log.info("Event handle_link_down %s", link)
835 1
        switch_flows = {}
836
        evcs_with_failover = []
837 1
        evcs_normal = []
838 1
        check_failover = []
839 1
        for evc in self.get_evcs_by_svc_level():
840 1
            if evc.is_affected_by_link(link):
841 1
                # if there is no failover path, handles link down the
842
                # tradditional way
843
                if (
844
                    not getattr(evc, 'failover_path', None) or
845
                    evc.is_failover_path_affected_by_link(link)
846
                ):
847
                    evcs_normal.append(evc)
848
                    continue
849
                try:
850 1
                    dpid_flows = evc.get_failover_flows()
851 1
                # pylint: disable=broad-except
852 1
                except Exception:
853 1
                    err = traceback.format_exc().replace("\n", ", ")
854 1
                    log.error(
855
                        f"Ignore Failover path for {evc} due to error: {err}"
856 1
                    )
857 1
                    evcs_normal.append(evc)
858 1
                    continue
859 1
                for dpid, flows in dpid_flows.items():
860 1
                    switch_flows.setdefault(dpid, [])
861 1
                    switch_flows[dpid].extend(flows)
862 1
                evcs_with_failover.append(evc)
863
            else:
864 1
                check_failover.append(evc)
865
866
        while switch_flows:
867
            offset = settings.BATCH_SIZE or None
868 1
            switches = list(switch_flows.keys())
869 1
            for dpid in switches:
870
                emit_event(
871
                    self.controller,
872
                    context="kytos.flow_manager",
873
                    name="flows.install",
874
                    content={
875
                        "dpid": dpid,
876
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
877
                    }
878
                )
879 1
                if offset is None or offset >= len(switch_flows[dpid]):
880 1
                    del switch_flows[dpid]
881 1
                    continue
882 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
883
            time.sleep(settings.BATCH_INTERVAL)
884 1
885 1
        for evc in evcs_with_failover:
886
            with evc.lock:
887
                old_path = evc.current_path
888
                evc.current_path = evc.failover_path
889 1
                evc.failover_path = old_path
890
                evc.sync()
891 1
            emit_event(self.controller, "redeployed_link_down",
892 1
                       content=map_evc_event_content(evc))
893 1
            log.info(
894 1
                f"{evc} redeployed with failover due to link down {link.id}"
895 1
            )
896 1
897 1
        for evc in evcs_normal:
898 1
            emit_event(
899 1
                self.controller,
900 1
                "evc_affected_by_link_down",
901 1
                content={"link_id": link.id} | map_evc_event_content(evc)
902
            )
903
904 1
        # After handling the hot path, check if new failover paths are needed.
905 1
        # Note that EVCs affected by link down will generate a KytosEvent for
906
        # deployed|redeployed, which will trigger the failover path setup.
907
        # Thus, we just need to further check the check_failover list
908
        for evc in check_failover:
909 1
            if evc.is_failover_path_affected_by_link(link):
910
                with evc.lock:
911 1
                    evc.setup_failover_path()
912 1
913 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
914 1
    def on_evc_affected_by_link_down(self, event):
915 1
        """Change circuit when link is down or under_mantenance."""
916
        self.handle_evc_affected_by_link_down(event)
917 1
918 1
    def handle_evc_affected_by_link_down(self, event):
919
        """Change circuit when link is down or under_mantenance."""
920
        evc = self.circuits.get(event.content["evc_id"])
921
        link_id = event.content['link_id']
922 1
        if not evc:
923
            return
924 1
        with evc.lock:
925 1
            result = evc.handle_link_down()
926 1
        event_name = "error_redeploy_link_down"
927 1
        if result:
928 1
            log.info(f"{evc} redeployed due to link down {link_id}")
929
            event_name = "redeployed_link_down"
930
        emit_event(self.controller, event_name,
931 1
                   content=map_evc_event_content(evc))
932
933 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
934 1
    def on_evc_deployed(self, event):
935 1
        """Handle EVC deployed|redeployed_link_down."""
936 1
        self.handle_evc_deployed(event)
937
938
    def handle_evc_deployed(self, event):
939 1
        """Setup failover path on evc deployed."""
940 1
        evc = self.circuits.get(event.content["evc_id"])
941 1
        if not evc:
942
            return
943 1
        with evc.lock:
944 1
            evc.setup_failover_path()
945 1
946
    @listen_to("kytos/topology.topology_loaded")
947 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
948 1
        """Load EVCs once the topology is available."""
949
        self.load_all_evcs()
950
951
    def load_all_evcs(self):
952 1
        """Try to load all EVCs on startup."""
953
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
954 1
        for circuit_id, circuit in circuits:
955 1
            if circuit_id not in self.circuits:
956 1
                self._load_evc(circuit)
957
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
958 1
                   timeout=1)
959 1
960 1
    def _load_evc(self, circuit_dict):
961 1
        """Load one EVC from mongodb to memory."""
962
        try:
963 1
            evc = self._evc_from_dict(circuit_dict)
964
        except (ValueError, KytosTagError) as exception:
965
            log.error(
966
                f"Could not load EVC: dict={circuit_dict} error={exception}"
967
            )
968 1
            return None
969 1
        if evc.archived:
970
            return None
971
972 1
        self.circuits.setdefault(evc.id, evc)
973 1
        self.sched.add(evc)
974 1
        return evc
975 1
976 1
    @listen_to("kytos/flow_manager.flow.error")
977 1
    def on_flow_mod_error(self, event):
978
        """Handle flow mod errors related to an EVC."""
979 1
        self.handle_flow_mod_error(event)
980 1
981 1
    def handle_flow_mod_error(self, event):
982 1
        """Handle flow mod errors related to an EVC."""
983
        flow = event.content["flow"]
984
        command = event.content.get("error_command")
985
        if command != "add":
986
            return
987
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
988
        if evc:
989
            with evc.lock:
990 1
                evc.remove_current_flows()
991 1
992
    def _evc_dict_with_instances(self, evc_dict):
993
        """Convert some dict values to instance of EVC classes.
994
995
        This method will convert: [UNI, Link]
996
        """
997
        data = evc_dict.copy()  # Do not modify the original dict
998 1
        for attribute, value in data.items():
999 1
            # Get multiple attributes.
1000
            # Ex: uni_a, uni_z
1001
            if "uni" in attribute:
1002
                try:
1003 1
                    data[attribute] = self._uni_from_dict(value)
1004
                except ValueError as exception:
1005 1
                    result = "Error creating UNI: Invalid value"
1006 1
                    raise ValueError(result) from exception
1007 1
1008 1
            if attribute == "circuit_scheduler":
1009
                data[attribute] = []
1010 1
                for schedule in value:
1011
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1012 1
1013 1
            # Get multiple attributes.
1014
            # Ex: primary_links,
1015 1
            #     backup_links,
1016 1
            #     current_links_cache,
1017 1
            #     primary_links_cache,
1018 1
            #     backup_links_cache
1019
            if "links" in attribute:
1020
                data[attribute] = [
1021
                    self._link_from_dict(link) for link in value
1022 1
                ]
1023 1
1024 1
            # Ex: current_path,
1025 1
            #     primary_path,
1026 1
            #     backup_path
1027 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1028 1
                data[attribute] = Path(
1029 1
                    [self._link_from_dict(link) for link in value]
1030 1
                )
1031 1
1032 1
        return data
1033
1034 1
    def _evc_from_dict(self, evc_dict):
1035
        data = self._evc_dict_with_instances(evc_dict)
1036 1
        data["table_group"] = self.table_group
1037 1
        return EVC(self.controller, **data)
1038 1
1039
    def _uni_from_dict(self, uni_dict):
1040 1
        """Return a UNI object from python dict."""
1041
        if uni_dict is None:
1042 1
            return False
1043 1
1044
        interface_id = uni_dict.get("interface_id")
1045 1
        interface = self.controller.get_interface_by_id(interface_id)
1046 1
        if interface is None:
1047 1
            result = (
1048 1
                "Error creating UNI:"
1049 1
                + f"Could not instantiate interface {interface_id}"
1050 1
            )
1051
            raise ValueError(result) from ValueError
1052
        tag_convert = {1: "vlan"}
1053
        tag_dict = uni_dict.get("tag", None)
1054 1
        if tag_dict:
1055 1
            tag_type = tag_dict.get("tag_type")
1056 1
            tag_type = tag_convert.get(tag_type, tag_type)
1057
            tag_value = tag_dict.get("value")
1058 1
            if isinstance(tag_value, list):
1059 1
                tag_value = get_tag_ranges(tag_value)
1060 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1061 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1062
            else:
1063
                tag = TAG(tag_type, tag_value)
1064 1
        else:
1065 1
            tag = None
1066
        uni = UNI(interface, tag)
1067 1
        return uni
1068
1069
    def _link_from_dict(self, link_dict):
1070
        """Return a Link object from python dict."""
1071
        id_a = link_dict.get("endpoint_a").get("id")
1072
        id_b = link_dict.get("endpoint_b").get("id")
1073
1074 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1075 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1076 1
        if not endpoint_a:
1077
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1078
            raise ValueError(error_msg)
1079 1
        if not endpoint_b:
1080 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1081 1
            raise ValueError(error_msg)
1082 1
1083 1
        link = Link(endpoint_a, endpoint_b)
1084 1
        if "metadata" in link_dict:
1085 1
            link.extend_metadata(link_dict.get("metadata"))
1086 1
1087 1
        s_vlan = link.get_metadata("s_vlan")
1088
        if s_vlan:
1089 1
            tag = TAG.from_dict(s_vlan)
1090
            if tag is False:
1091
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1092
                raise ValueError(error_msg)
1093
            link.update_metadata("s_vlan", tag)
1094
        return link
1095 1
1096
    def _find_evc_by_schedule_id(self, schedule_id):
1097 1
        """
1098 1
        Find an EVC and CircuitSchedule based on schedule_id.
1099 1
1100 1
        :param schedule_id: Schedule ID
1101 1
        :return: EVC and Schedule
1102
        """
1103
        circuits = self._get_circuits_buffer()
1104 1
        found_schedule = None
1105 1
        evc = None
1106
1107 1
        # pylint: disable=unused-variable
1108 1
        for c_id, circuit in circuits.items():
1109 1
            for schedule in circuit.circuit_scheduler:
1110 1
                if schedule.id == schedule_id:
1111 1
                    found_schedule = schedule
1112 1
                    evc = circuit
1113
                    break
1114
            if found_schedule:
1115 1
                break
1116 1
        return evc, found_schedule
1117 1
1118 1
    def _get_circuits_buffer(self):
1119 1
        """
1120
        Return the circuit buffer.
1121
1122
        If the buffer is empty, try to load data from mongodb.
1123
        """
1124
        if not self.circuits:
1125
            # Load circuits from mongodb to buffer
1126
            circuits = self.mongo_controller.get_circuits()['circuits']
1127
            for c_id, circuit in circuits.items():
1128
                evc = self._evc_from_dict(circuit)
1129
                self.circuits[c_id] = evc
1130
        return self.circuits
1131
1132
    # pylint: disable=attribute-defined-outside-init
1133
    @alisten_to("kytos/of_multi_table.enable_table")
1134
    async def on_table_enabled(self, event):
1135
        """Handle a recently table enabled."""
1136
        table_group = event.content.get("mef_eline", None)
1137
        if not table_group:
1138
            return
1139
        for group in table_group:
1140
            if group not in settings.TABLE_GROUP_ALLOWED:
1141
                log.error(f'The table group "{group}" is not allowed for '
1142
                          f'mef_eline. Allowed table groups are '
1143
                          f'{settings.TABLE_GROUP_ALLOWED}')
1144
                return
1145
        self.table_group.update(table_group)
1146
        content = {"group_table": self.table_group}
1147
        name = "kytos/mef_eline.enable_table"
1148
        await aemit_event(self.controller, name, content)
1149