Test Failed
Pull Request — master (#411)
by
unknown
03:40
created

build.main.Main.get_metadata()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 11
nop 2
dl 0
loc 13
ccs 6
cts 6
cp 1
crap 2
rs 9.85
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath, DuplicatedNoTagUNI
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26
                                          Path)
27 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31
32
33
# pylint: disable=too-many-public-methods
34 1
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37
    This class is the entry point for this napp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52
53
        # object to save and load circuits
54 1
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58 1
        DynamicPathManager.set_controller(self.controller)
59
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63
64 1
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69 1
        self._topology_updated_at = None
70
71 1
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77
                      key=lambda x: (-x.service_level, x.creation_time))
78
79 1
    @staticmethod
80 1
    def get_eline_controller():
81
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83
84 1
    def execute(self):
85
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89 1
        with self._lock:
90 1
            self.execute_consistency()
91 1
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96 1
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109
110 1
    def execute_consistency(self):
111
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124 1
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133 1
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135 1
            self._load_evc(stored_circuits[circuit_id])
136
137 1
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140
        If you have some cleanup procedure, insert it here.
141
        """
142
143 1
    @rest("/v2/evc/", methods=["GET"])
144 1
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147
        archive query arg if defined (not null) will be filtered
148
        accordingly, by default only non archived evcs will be listed
149
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152 1
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159 1
    @rest("/v2/evc/schedule", methods=["GET"])
160 1
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165
         "circuit_id": <circuit_id>,
166
         "schedule": <schedule object>}]
167
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171 1
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179 1
            if circuit_scheduler:
180 1
                for scheduler in circuit_scheduler:
181 1
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201 1
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204
205
    # pylint: disable=too-many-branches, too-many-statements
206 1
    @rest("/v2/evc/", methods=["POST"])
207 1
    @validate_openapi(spec)
208 1
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229
230
        Finnaly, notify user of the status of its request.
231
        """
232
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235
236 1
        try:
237 1
            evc = self._evc_from_dict(data)
238 1
        except (ValueError, KytosTagError) as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        try:
242 1
            check_disabled_component(evc.uni_a, evc.uni_z)
243 1
        except DisabledSwitch as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 409)
245 1
            raise HTTPException(
246
                    409,
247
                    detail=f"Path is not valid: {exception}"
248
                ) from exception
249
250 1
        if evc.primary_path:
251 1
            try:
252 1
                evc.primary_path.is_valid(
253
                    evc.uni_a.interface.switch,
254
                    evc.uni_z.interface.switch,
255
                    bool(evc.circuit_scheduler),
256
                )
257 1
            except InvalidPath as exception:
258 1
                raise HTTPException(
259
                    400,
260
                    detail=f"primary_path is not valid: {exception}"
261
                ) from exception
262 1
        if evc.backup_path:
263 1
            try:
264 1
                evc.backup_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"backup_path is not valid: {exception}"
273
                ) from exception
274
275 1
        if not evc._tag_lists_equal():
276 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
277 1
            raise HTTPException(400, detail=detail)
278
279 1
        try:
280 1
            evc._validate_has_primary_or_dynamic()
281 1
        except ValueError as exception:
282 1
            raise HTTPException(400, detail=str(exception)) from exception
283
284 1
        try:
285 1
            evc_data = {"id": evc.id, "uni_a": evc.uni_a, "uni_z": evc.uni_z}
286 1
            self._check_no_tag_duplication(evc_data)
287 1
        except DuplicatedNoTagUNI as exception:
288
            log.debug("create_circuit result %s %s", exception, 400)
289
            raise HTTPException(400, detail=str(exception)) from exception
290 1
291 1
        try:
292
            self._use_uni_tags(evc)
293
        except KytosTagError as exception:
294
            raise HTTPException(400, detail=str(exception)) from exception
295
296 1
        # save circuit
297
        try:
298
            evc.sync()
299 1
        except ValidationError as exception:
300
            raise HTTPException(400, detail=str(exception)) from exception
301
302 1
        # store circuit in dictionary
303 1
        self.circuits[evc.id] = evc
304 1
305
        # Schedule the circuit deploy
306
        self.sched.add(evc)
307 1
308 1
        # Circuit has no schedule, deploy now
309 1
        if not evc.circuit_scheduler:
310 1
            with evc.lock:
311
                evc.deploy()
312 1
313
        # Notify users
314 1
        result = {"circuit_id": evc.id}
315 1
        status = 201
316 1
        log.debug("create_circuit result %s %s", result, status)
317 1
        emit_event(self.controller, name="created",
318 1
                   content=map_evc_event_content(evc))
319 1
        return JSONResponse(result, status_code=status)
320 1
321 1
    @staticmethod
322 1
    def _use_uni_tags(evc):
323 1
        uni_a = evc.uni_a
324
        evc._use_uni_vlan(uni_a)
325 1
        try:
326 1
            uni_z = evc.uni_z
327
            evc._use_uni_vlan(uni_z)
328
        except KytosTagError as err:
329
            evc.make_uni_vlan_available(uni_a)
330 1
            raise err
331
332 1
    @listen_to('kytos/flow_manager.flow.removed')
333 1
    def on_flow_delete(self, event):
334 1
        """Capture delete messages to keep track when flows got removed."""
335 1
        self.handle_flow_delete(event)
336 1
337
    def handle_flow_delete(self, event):
338 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
339 1
        flow = event.content["flow"]
340 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
341
        if evc:
342
            log.debug("Flow removed in EVC %s", evc.id)
343
            evc.set_flow_removed_at()
344
345
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
346 1
    @validate_openapi(spec)
347 1
    def update(self, request: Request) -> JSONResponse:
348 1
        """Update a circuit based on payload.
349 1
350 1
        The EVC attributes (creation_time, active, current_path,
351 1
        failover_path, _id, archived) can't be updated.
352 1
        """
353 1
        data = get_json_or_400(request, self.controller.loop)
354 1
        circuit_id = request.path_params["circuit_id"]
355
        log.debug("update /v2/evc/%s", circuit_id)
356 1
        try:
357 1
            evc = self.circuits[circuit_id]
358 1
        except KeyError:
359 1
            result = f"circuit_id {circuit_id} not found"
360
            log.debug("update result %s %s", result, 404)
361 1
            raise HTTPException(404, detail=result) from KeyError
362 1
363
        if evc.archived:
364
            result = "Can't update archived EVC"
365 1
            log.debug("update result %s %s", result, 409)
366 1
            raise HTTPException(409, detail=result)
367 1
368 1
        try:
369 1
            updated_data = self._evc_dict_with_instances(data)
370 1
            self._check_no_tag_duplication(dict(updated_data, id=circuit_id))
371
            enable, redeploy = evc.update(**updated_data)
372
        except (
373
                ValueError, KytosTagError,
374
                ValidationError, DuplicatedNoTagUNI
375 1
            ) as exception:
376
            log.debug("update result %s %s", exception, 400)
377
            raise HTTPException(400, detail=str(exception)) from exception
378
        except DisabledSwitch as exception:
379
            log.debug("update result %s %s", exception, 409)
380
            raise HTTPException(
381
                    409,
382
                    detail=f"Path is not valid: {exception}"
383
                ) from exception
384 1
385 1
        if evc.is_active():
386 1
            if enable is False:  # disable if active
387 1
                with evc.lock:
388 1
                    evc.remove()
389
            elif redeploy is not None:  # redeploy if active
390 1
                with evc.lock:
391 1
                    evc.remove()
392
                    evc.deploy()
393 1
        else:
394
            if enable is True:  # enable if inactive
395 1
                with evc.lock:
396 1
                    evc.deploy()
397
        result = {evc.id: evc.as_dict()}
398
        status = 200
399
400
        log.debug("update result %s %s", result, status)
401
        emit_event(self.controller, "updated",
402 1
                   content=map_evc_event_content(evc, **data))
403 1
        return JSONResponse(result, status_code=status)
404 1
405 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
406 1
    def delete_circuit(self, request: Request) -> JSONResponse:
407 1
        """Remove a circuit.
408 1
409 1
        First, the flows are removed from the switches, and then the EVC is
410
        disabled.
411 1
        """
412 1
        circuit_id = request.path_params["circuit_id"]
413 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
414 1
        try:
415
            evc = self.circuits[circuit_id]
416 1
        except KeyError:
417 1
            result = f"circuit_id {circuit_id} not found"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419 1
            raise HTTPException(404, detail=result) from KeyError
420 1
421 1
        if evc.archived:
422 1
            result = f"Circuit {circuit_id} already removed"
423 1
            log.debug("delete_circuit result %s %s", result, 404)
424 1
            raise HTTPException(404, detail=result)
425 1
426 1
        log.info("Removing %s", evc)
427 1
        with evc.lock:
428 1
            evc.remove_current_flows()
429
            evc.remove_failover_flows(sync=False)
430 1
            evc.deactivate()
431 1
            evc.disable()
432
            self.sched.remove(evc)
433 1
            evc.archive()
434
            evc.remove_uni_tags()
435 1
            evc.sync()
436 1
        log.info("EVC removed. %s", evc)
437
        result = {"response": f"Circuit {circuit_id} removed"}
438 1
        status = 200
439 1
440 1
        log.debug("delete_circuit result %s %s", result, status)
441
        emit_event(self.controller, "deleted",
442
                   content=map_evc_event_content(evc))
443
        return JSONResponse(result, status_code=status)
444
445
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
446
    def get_metadata(self, request: Request) -> JSONResponse:
447
        """Get metadata from an EVC."""
448
        circuit_id = request.path_params["circuit_id"]
449 1
        try:
450 1
            return (
451 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
452
            )
453 1
        except KeyError as error:
454 1
            raise HTTPException(
455
                404,
456 1
                detail=f"circuit_id {circuit_id} not found."
457
            ) from error
458 1
459 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460 1
    @validate_openapi(spec)
461 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
462 1
        """Add metadata to a bulk of EVCs."""
463 1
        data = get_json_or_400(request, self.controller.loop)
464 1
        circuit_ids = data.pop("circuit_ids")
465
466 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
467 1
468 1
        fail_evcs = []
469
        for _id in circuit_ids:
470 1
            try:
471 1
                evc = self.circuits[_id]
472 1
                evc.extend_metadata(data)
473
            except KeyError:
474 1
                fail_evcs.append(_id)
475 1
476 1
        if fail_evcs:
477
            raise HTTPException(404, detail=fail_evcs)
478 1
        return JSONResponse("Operation successful", status_code=201)
479 1
480 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
481 1
    @validate_openapi(spec)
482
    def add_metadata(self, request: Request) -> JSONResponse:
483
        """Add metadata to an EVC."""
484
        circuit_id = request.path_params["circuit_id"]
485
        metadata = get_json_or_400(request, self.controller.loop)
486 1
        if not isinstance(metadata, dict):
487 1
            raise HTTPException(400, "Invalid metadata value: {metadata}")
488 1
        try:
489
            evc = self.circuits[circuit_id]
490 1
        except KeyError as error:
491 1
            raise HTTPException(
492 1
                404,
493
                detail=f"circuit_id {circuit_id} not found."
494 1
            ) from error
495 1
496 1
        evc.extend_metadata(metadata)
497 1
        evc.sync()
498
        return JSONResponse("Operation successful", status_code=201)
499 1
500 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
501 1
    @validate_openapi(spec)
502 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
503 1
        """Delete metada from a bulk of EVCs"""
504 1
        data = get_json_or_400(request, self.controller.loop)
505 1
        key = request.path_params["key"]
506
        circuit_ids = data.pop("circuit_ids")
507 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
508 1
509 1
        fail_evcs = []
510
        for _id in circuit_ids:
511 1
            try:
512 1
                evc = self.circuits[_id]
513
                evc.remove_metadata(key)
514 1
            except KeyError:
515 1
                fail_evcs.append(_id)
516 1
517 1
        if fail_evcs:
518 1
            raise HTTPException(404, detail=fail_evcs)
519 1
        return JSONResponse("Operation successful")
520
521
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
522
    def delete_metadata(self, request: Request) -> JSONResponse:
523
        """Delete metadata from an EVC."""
524 1
        circuit_id = request.path_params["circuit_id"]
525 1
        key = request.path_params["key"]
526 1
        try:
527
            evc = self.circuits[circuit_id]
528 1
        except KeyError as error:
529 1
            raise HTTPException(
530
                404,
531 1
                detail=f"circuit_id {circuit_id} not found."
532 1
            ) from error
533 1
534 1
        evc.remove_metadata(key)
535 1
        evc.sync()
536 1
        return JSONResponse("Operation successful")
537
538
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
539
    def redeploy(self, request: Request) -> JSONResponse:
540 1
        """Endpoint to force the redeployment of an EVC."""
541 1
        circuit_id = request.path_params["circuit_id"]
542 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
543 1
        try:
544 1
            evc = self.circuits[circuit_id]
545 1
        except KeyError:
546
            raise HTTPException(
547 1
                404,
548 1
                detail=f"circuit_id {circuit_id} not found"
549
            ) from KeyError
550 1
        if evc.is_enabled():
551
            with evc.lock:
552 1
                evc.remove_current_flows()
553 1
                evc.deploy()
554 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
555
            status = 202
556
        else:
557
            result = {"response": f"Circuit {circuit_id} is disabled."}
558
            status = 409
559
560
        return JSONResponse(result, status_code=status)
561
562
    @rest("/v2/evc/schedule/", methods=["POST"])
563
    @validate_openapi(spec)
564
    def create_schedule(self, request: Request) -> JSONResponse:
565
        """
566
        Create a new schedule for a given circuit.
567
568
        This service do no check if there are conflicts with another schedule.
569
        Payload example:
570 1
            {
571 1
              "circuit_id":"aa:bb:cc",
572 1
              "schedule": {
573 1
                "date": "2019-08-07T14:52:10.967Z",
574
                "interval": "string",
575
                "frequency": "1 * * * *",
576 1
                "action": "create"
577
              }
578
            }
579 1
        """
580
        log.debug("create_schedule /v2/evc/schedule/")
581
        data = get_json_or_400(request, self.controller.loop)
582 1
        circuit_id = data["circuit_id"]
583 1
        schedule_data = data["schedule"]
584 1
585 1
        # Get EVC from circuits buffer
586
        circuits = self._get_circuits_buffer()
587 1
588 1
        # get the circuit
589 1
        evc = circuits.get(circuit_id)
590 1
591
        # get the circuit
592
        if not evc:
593 1
            result = f"circuit_id {circuit_id} not found"
594
            log.debug("create_schedule result %s %s", result, 404)
595
            raise HTTPException(404, detail=result)
596 1
        # Can not modify circuits deleted and archived
597 1
        if evc.archived:
598
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
599
            log.debug("create_schedule result %s %s", result, 409)
600 1
            raise HTTPException(409, detail=result)
601
602
        # new schedule from dict
603 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604
605
        # If there is no schedule, create the list
606 1
        if not evc.circuit_scheduler:
607
            evc.circuit_scheduler = []
608 1
609 1
        # Add the new schedule
610
        evc.circuit_scheduler.append(new_schedule)
611 1
612 1
        # Add schedule job
613
        self.sched.add_circuit_job(evc, new_schedule)
614 1
615 1
        # save circuit to mongodb
616 1
        evc.sync()
617
618
        result = new_schedule.as_dict()
619
        status = 201
620
621
        log.debug("create_schedule result %s %s", result, status)
622
        return JSONResponse(result, status_code=status)
623
624
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625
    @validate_openapi(spec)
626
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629 1
        Change all attributes from the given schedule from a EVC circuit.
630 1
        The schedule ID is preserved as default.
631 1
        Payload example:
632
            {
633
              "date": "2019-08-07T14:52:10.967Z",
634 1
              "interval": "string",
635
              "frequency": "1 * * *",
636
              "action": "create"
637 1
            }
638 1
        """
639 1
        data = get_json_or_400(request, self.controller.loop)
640 1
        schedule_id = request.path_params["schedule_id"]
641 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642 1
643 1
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645
646 1
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648
            result = f"schedule_id {schedule_id} not found"
649 1
            log.debug("update_schedule result %s %s", result, 404)
650
            raise HTTPException(404, detail=result)
651 1
        if evc.archived:
652
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653
            log.debug("update_schedule result %s %s", result, 409)
654 1
            raise HTTPException(409, detail=result)
655
656 1
        new_schedule = CircuitSchedule.from_dict(data)
657
        new_schedule.id = found_schedule.id
658 1
        # Remove the old schedule
659
        evc.circuit_scheduler.remove(found_schedule)
660 1
        # Append the modified schedule
661 1
        evc.circuit_scheduler.append(new_schedule)
662
663 1
        # Cancel all schedule jobs
664 1
        self.sched.cancel_job(found_schedule.id)
665
        # Add the new circuit schedule
666 1
        self.sched.add_circuit_job(evc, new_schedule)
667 1
        # Save EVC to mongodb
668
        evc.sync()
669
670
        result = new_schedule.as_dict()
671
        status = 200
672
673
        log.debug("update_schedule result %s %s", result, status)
674 1
        return JSONResponse(result, status_code=status)
675 1
676 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
677
    def delete_schedule(self, request: Request) -> JSONResponse:
678
        """Remove a circuit schedule.
679 1
680 1
        Remove the Schedule from EVC.
681 1
        Remove the Schedule from cron job.
682 1
        Save the EVC to the Storehouse.
683
        """
684 1
        schedule_id = request.path_params["schedule_id"]
685 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
686 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
687 1
688
        # Can not modify circuits deleted and archived
689
        if not found_schedule:
690 1
            result = f"schedule_id {schedule_id} not found"
691
            log.debug("delete_schedule result %s %s", result, 404)
692
            raise HTTPException(404, detail=result)
693 1
694
        if evc.archived:
695 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
696
            log.debug("delete_schedule result %s %s", result, 409)
697 1
            raise HTTPException(409, detail=result)
698 1
699
        # Remove the old schedule
700 1
        evc.circuit_scheduler.remove(found_schedule)
701 1
702
        # Cancel all schedule jobs
703 1
        self.sched.cancel_job(found_schedule.id)
704 1
        # Save EVC to mongodb
705
        evc.sync()
706
707
        result = "Schedule removed"
708 1
        status = 200
709
710 1
        log.debug("delete_schedule result %s %s", result, status)
711 1
        return JSONResponse(result, status_code=status)
712 1
713 1
    def _check_no_tag_duplication(self, evc_dict: dict):
714 1
        """Check if the given EVC has UNIs with no tag and if these are
715
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
716
        Args:
717 1
            evc (dict): EVC to be analyzed.
718
        """
719
        uni_a, uni_z = evc_dict.get("uni_a"), evc_dict.get("uni_z")
720
721 1
        # No UNIs
722
        if not (uni_a or uni_z):
723
            return
724
725
        if (not (uni_a and not uni_a.user_tag) and
726
                not (uni_z and not uni_z.user_tag)):
727
            return
728
        for circuit in self.circuits.values():
729
            if (not circuit.archived and circuit._id != evc_dict["id"]):
730
                if uni_a and uni_a.user_tag is None:
731
                    circuit.check_no_tag_duplicate(uni_a)
732
                if uni_z and uni_z.user_tag is None:
733 1
                    circuit.check_no_tag_duplicate(uni_z)
734
735
    @listen_to("kytos/topology.link_up")
736
    def on_link_up(self, event):
737
        """Change circuit when link is up or end_maintenance."""
738
        self.handle_link_up(event)
739
740
    def handle_link_up(self, event):
741
        """Change circuit when link is up or end_maintenance."""
742
        log.info("Event handle_link_up %s", event.content["link"])
743 1
        for evc in self.get_evcs_by_svc_level():
744
            if evc.is_enabled() and not evc.archived:
745
                with evc.lock:
746
                    evc.handle_link_up(event.content["link"])
747
748
    # Possibly replace this with interruptions?
749
    @listen_to(
750
        '.*.switch.interface.(link_up|link_down|created|deleted)',
751
        pool='dynamic_single'
752
    )
753 1
    def on_interface_link_change(self, event: KytosEvent):
754 1
        """
755
        Handler for interface link_up and link_down events
756
        """
757
        with self._lock:
758 1
            _, _, event_type = event.name.rpartition('.')
759
            iface = event.content.get("interface")
760 1
            if event_type in ('link_up', 'created'):
761 1
                self.handle_interface_link_up(iface)
762 1
            elif event_type in ('link_down', 'deleted'):
763 1
                self.handle_interface_link_down(iface)
764 1
765 1
    def handle_interface_link_up(self, interface):
766 1
        """
767 1
        Handler for interface link_up events
768
        """
769
        for evc in self.get_evcs_by_svc_level():
770 1
            log.info("Event handle_interface_link_up %s", interface)
771
            evc.handle_interface_link_up(
772
                interface
773
            )
774 1
775 1
    def handle_interface_link_down(self, interface):
776 1
        """
777 1
        Handler for interface link_down events
778
        """
779 1
        for evc in self.get_evcs_by_svc_level():
780 1
            log.info("Event handle_interface_link_down %s", interface)
781 1
            evc.handle_interface_link_down(
782
                interface
783
            )
784 1
785 1
    @listen_to("kytos/topology.link_down")
786 1
    def on_link_down(self, event):
787 1
        """Change circuit when link is down or under_mantenance."""
788 1
        self.handle_link_down(event)
789 1
790
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
791 1
        """Change circuit when link is down or under_mantenance."""
792
        link = event.content["link"]
793 1
        log.info("Event handle_link_down %s", link)
794 1
        switch_flows = {}
795 1
        evcs_with_failover = []
796 1
        evcs_normal = []
797 1
        check_failover = []
798
        for evc in self.get_evcs_by_svc_level():
799
            if evc.is_affected_by_link(link):
800
                # if there is no failover path, handles link down the
801
                # tradditional way
802
                if (
803
                    not getattr(evc, 'failover_path', None) or
804
                    evc.is_failover_path_affected_by_link(link)
805
                ):
806 1
                    evcs_normal.append(evc)
807 1
                    continue
808 1
                try:
809 1
                    dpid_flows = evc.get_failover_flows()
810 1
                # pylint: disable=broad-except
811
                except Exception:
812 1
                    err = traceback.format_exc().replace("\n", ", ")
813 1
                    log.error(
814 1
                        f"Ignore Failover path for {evc} due to error: {err}"
815 1
                    )
816 1
                    evcs_normal.append(evc)
817 1
                    continue
818 1
                for dpid, flows in dpid_flows.items():
819
                    switch_flows.setdefault(dpid, [])
820 1
                    switch_flows[dpid].extend(flows)
821
                evcs_with_failover.append(evc)
822
            else:
823
                check_failover.append(evc)
824 1
825 1
        while switch_flows:
826
            offset = settings.BATCH_SIZE or None
827
            switches = list(switch_flows.keys())
828
            for dpid in switches:
829
                emit_event(
830
                    self.controller,
831
                    context="kytos.flow_manager",
832
                    name="flows.install",
833
                    content={
834
                        "dpid": dpid,
835 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
836 1
                    }
837 1
                )
838 1
                if offset is None or offset >= len(switch_flows[dpid]):
839
                    del switch_flows[dpid]
840 1
                    continue
841 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
842
            time.sleep(settings.BATCH_INTERVAL)
843
844
        for evc in evcs_with_failover:
845 1
            with evc.lock:
846
                old_path = evc.current_path
847 1
                evc.current_path = evc.failover_path
848 1
                evc.failover_path = old_path
849 1
                evc.sync()
850 1
            emit_event(self.controller, "redeployed_link_down",
851 1
                       content=map_evc_event_content(evc))
852 1
            log.info(
853 1
                f"{evc} redeployed with failover due to link down {link.id}"
854 1
            )
855 1
856 1
        for evc in evcs_normal:
857 1
            emit_event(
858
                self.controller,
859
                "evc_affected_by_link_down",
860 1
                content={"link_id": link.id} | map_evc_event_content(evc)
861 1
            )
862
863
        # After handling the hot path, check if new failover paths are needed.
864
        # Note that EVCs affected by link down will generate a KytosEvent for
865 1
        # deployed|redeployed, which will trigger the failover path setup.
866
        # Thus, we just need to further check the check_failover list
867 1
        for evc in check_failover:
868 1
            if evc.is_failover_path_affected_by_link(link):
869 1
                with evc.lock:
870 1
                    evc.setup_failover_path()
871 1
872
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
873 1
    def on_evc_affected_by_link_down(self, event):
874 1
        """Change circuit when link is down or under_mantenance."""
875
        self.handle_evc_affected_by_link_down(event)
876
877
    def handle_evc_affected_by_link_down(self, event):
878 1
        """Change circuit when link is down or under_mantenance."""
879
        evc = self.circuits.get(event.content["evc_id"])
880 1
        link_id = event.content['link_id']
881 1
        if not evc:
882 1
            return
883 1
        with evc.lock:
884 1
            result = evc.handle_link_down()
885
        event_name = "error_redeploy_link_down"
886 1
        if result:
887
            log.info(f"{evc} redeployed due to link down {link_id}")
888 1
            event_name = "redeployed_link_down"
889 1
        emit_event(self.controller, event_name,
890 1
                   content=map_evc_event_content(evc))
891 1
892
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
893
    def on_evc_deployed(self, event):
894 1
        """Handle EVC deployed|redeployed_link_down."""
895 1
        self.handle_evc_deployed(event)
896 1
897
    def handle_evc_deployed(self, event):
898 1
        """Setup failover path on evc deployed."""
899 1
        evc = self.circuits.get(event.content["evc_id"])
900 1
        if not evc:
901
            return
902 1
        with evc.lock:
903 1
            evc.setup_failover_path()
904
905
    @listen_to("kytos/topology.topology_loaded")
906
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
907 1
        """Load EVCs once the topology is available."""
908
        self.load_all_evcs()
909 1
910 1
    def load_all_evcs(self):
911 1
        """Try to load all EVCs on startup."""
912
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
913 1
        for circuit_id, circuit in circuits:
914 1
            if circuit_id not in self.circuits:
915 1
                self._load_evc(circuit)
916 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits))
917
918 1
    def _load_evc(self, circuit_dict):
919
        """Load one EVC from mongodb to memory."""
920
        try:
921
            evc = self._evc_from_dict(circuit_dict)
922
        except (ValueError, KytosTagError) as exception:
923 1
            log.error(
924 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
925
            )
926
            return None
927 1
        if evc.archived:
928 1
            return None
929 1
930 1
        self.circuits.setdefault(evc.id, evc)
931 1
        self.sched.add(evc)
932 1
        return evc
933
934 1
    @listen_to("kytos/flow_manager.flow.error")
935 1
    def on_flow_mod_error(self, event):
936 1
        """Handle flow mod errors related to an EVC."""
937 1
        self.handle_flow_mod_error(event)
938
939
    def handle_flow_mod_error(self, event):
940
        """Handle flow mod errors related to an EVC."""
941
        flow = event.content["flow"]
942
        command = event.content.get("error_command")
943
        if command != "add":
944
            return
945 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
946 1
        if evc:
947
            with evc.lock:
948
                evc.remove_current_flows()
949
950
    def _evc_dict_with_instances(self, evc_dict):
951
        """Convert some dict values to instance of EVC classes.
952
953 1
        This method will convert: [UNI, Link]
954 1
        """
955
        data = evc_dict.copy()  # Do not modify the original dict
956
        for attribute, value in data.items():
957
            # Get multiple attributes.
958 1
            # Ex: uni_a, uni_z
959
            if "uni" in attribute:
960 1
                try:
961 1
                    data[attribute] = self._uni_from_dict(value)
962 1
                except ValueError as exception:
963 1
                    result = "Error creating UNI: Invalid value"
964
                    raise ValueError(result) from exception
965 1
966
            if attribute == "circuit_scheduler":
967 1
                data[attribute] = []
968 1
                for schedule in value:
969
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
970 1
971 1
            # Get multiple attributes.
972 1
            # Ex: primary_links,
973 1
            #     backup_links,
974
            #     current_links_cache,
975
            #     primary_links_cache,
976
            #     backup_links_cache
977 1
            if "links" in attribute:
978 1
                data[attribute] = [
979 1
                    self._link_from_dict(link) for link in value
980 1
                ]
981 1
982 1
            # Ex: current_path,
983 1
            #     primary_path,
984 1
            #     backup_path
985 1
            if "path" in attribute and attribute != "dynamic_backup_path":
986 1
                data[attribute] = Path(
987 1
                    [self._link_from_dict(link) for link in value]
988
                )
989 1
990
        return data
991 1
992 1
    def _evc_from_dict(self, evc_dict):
993 1
        data = self._evc_dict_with_instances(evc_dict)
994
        data["table_group"] = self.table_group
995 1
        return EVC(self.controller, **data)
996
997 1
    def _uni_from_dict(self, uni_dict):
998 1
        """Return a UNI object from python dict."""
999
        if uni_dict is None:
1000 1
            return False
1001 1
1002 1
        interface_id = uni_dict.get("interface_id")
1003 1
        interface = self.controller.get_interface_by_id(interface_id)
1004 1
        if interface is None:
1005 1
            result = (
1006
                "Error creating UNI:"
1007
                + f"Could not instantiate interface {interface_id}"
1008
            )
1009 1
            raise ValueError(result) from ValueError
1010 1
        tag_convert = {1: "vlan"}
1011 1
        tag_dict = uni_dict.get("tag", None)
1012
        if tag_dict:
1013 1
            tag_type = tag_dict.get("tag_type")
1014 1
            tag_type = tag_convert.get(tag_type, tag_type)
1015 1
            tag_value = tag_dict.get("value")
1016 1
            if isinstance(tag_value, list):
1017
                tag_value = get_tag_ranges(tag_value)
1018
                mask_list = get_vlan_tags_and_masks(tag_value)
1019 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1020 1
            else:
1021
                tag = TAG(tag_type, tag_value)
1022 1
        else:
1023
            tag = None
1024
        uni = UNI(interface, tag)
1025
        return uni
1026
1027
    def _link_from_dict(self, link_dict):
1028
        """Return a Link object from python dict."""
1029 1
        id_a = link_dict.get("endpoint_a").get("id")
1030 1
        id_b = link_dict.get("endpoint_b").get("id")
1031 1
1032
        endpoint_a = self.controller.get_interface_by_id(id_a)
1033
        endpoint_b = self.controller.get_interface_by_id(id_b)
1034 1
        if not endpoint_a:
1035 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1036 1
            raise ValueError(error_msg)
1037 1
        if not endpoint_b:
1038 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1039 1
            raise ValueError(error_msg)
1040 1
1041 1
        link = Link(endpoint_a, endpoint_b)
1042 1
        if "metadata" in link_dict:
1043
            link.extend_metadata(link_dict.get("metadata"))
1044 1
1045
        s_vlan = link.get_metadata("s_vlan")
1046
        if s_vlan:
1047
            tag = TAG.from_dict(s_vlan)
1048
            if tag is False:
1049
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1050 1
                raise ValueError(error_msg)
1051
            link.update_metadata("s_vlan", tag)
1052 1
        return link
1053 1
1054 1
    def _find_evc_by_schedule_id(self, schedule_id):
1055 1
        """
1056 1
        Find an EVC and CircuitSchedule based on schedule_id.
1057
1058
        :param schedule_id: Schedule ID
1059 1
        :return: EVC and Schedule
1060 1
        """
1061
        circuits = self._get_circuits_buffer()
1062 1
        found_schedule = None
1063 1
        evc = None
1064 1
1065 1
        # pylint: disable=unused-variable
1066 1
        for c_id, circuit in circuits.items():
1067 1
            for schedule in circuit.circuit_scheduler:
1068
                if schedule.id == schedule_id:
1069
                    found_schedule = schedule
1070 1
                    evc = circuit
1071 1
                    break
1072 1
            if found_schedule:
1073 1
                break
1074 1
        return evc, found_schedule
1075
1076
    def _get_circuits_buffer(self):
1077
        """
1078
        Return the circuit buffer.
1079
1080
        If the buffer is empty, try to load data from mongodb.
1081
        """
1082
        if not self.circuits:
1083
            # Load circuits from mongodb to buffer
1084
            circuits = self.mongo_controller.get_circuits()['circuits']
1085
            for c_id, circuit in circuits.items():
1086
                evc = self._evc_from_dict(circuit)
1087
                self.circuits[c_id] = evc
1088
        return self.circuits
1089
1090
    # pylint: disable=attribute-defined-outside-init
1091
    @alisten_to("kytos/of_multi_table.enable_table")
1092
    async def on_table_enabled(self, event):
1093
        """Handle a recently table enabled."""
1094
        table_group = event.content.get("mef_eline", None)
1095
        if not table_group:
1096
            return
1097
        for group in table_group:
1098
            if group not in settings.TABLE_GROUP_ALLOWED:
1099
                log.error(f'The table group "{group}" is not allowed for '
1100
                          f'mef_eline. Allowed table groups are '
1101
                          f'{settings.TABLE_GROUP_ALLOWED}')
1102
                return
1103
        self.table_group.update(table_group)
1104
        content = {"group_table": self.table_group}
1105
        name = "kytos/mef_eline.enable_table"
1106
        await aemit_event(self.controller, name, content)
1107