Passed
Push — master ( 2baa92...7a5266 )
by Vinicius
03:01 queued 27s
created

build.main.Main.create_circuit()   F

Complexity

Conditions 14

Size

Total Lines 113
Code Lines 71

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 50
CRAP Score 14.1472

Importance

Changes 0
Metric Value
cc 14
eloc 71
nop 2
dl 0
loc 113
ccs 50
cts 55
cp 0.9091
crap 14.1472
rs 3.469
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.create_circuit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10 1
from typing import Optional
11
12 1
from pydantic import ValidationError
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.exceptions import KytosTagError
17 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
18
                                validate_openapi)
19 1
from kytos.core.interface import TAG, UNI, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
22
                                 get_json_or_400)
23 1
from kytos.core.tag_ranges import get_tag_ranges
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
26
                                              DuplicatedNoTagUNI, InvalidPath)
27 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
28
                                          Path)
29 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
30 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
31
                                         emit_event, get_vlan_tags_and_masks,
32
                                         map_evc_event_content)
33
34
35
# pylint: disable=too-many-public-methods
36 1
class Main(KytosNApp):
37
    """Main class of amlight/mef_eline NApp.
38
39
    This class is the entry point for this napp.
40
    """
41
42 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
43
44 1
    def setup(self):
45
        """Replace the '__init__' method for the KytosNApp subclass.
46
47
        The setup method is automatically called by the controller when your
48
        application is loaded.
49
50
        So, if you have any setup routine, insert it here.
51
        """
52
        # object used to scheduler circuit events
53 1
        self.sched = Scheduler()
54
55
        # object to save and load circuits
56 1
        self.mongo_controller = self.get_eline_controller()
57 1
        self.mongo_controller.bootstrap_indexes()
58
59
        # set the controller that will manager the dynamic paths
60 1
        DynamicPathManager.set_controller(self.controller)
61
62
        # dictionary of EVCs created. It acts as a circuit buffer.
63
        # Every create/update/delete must be synced to mongodb.
64 1
        self.circuits = {}
65
66 1
        self.table_group = {"epl": 0, "evpl": 0}
67 1
        self._lock = Lock()
68 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
69
70 1
        self.load_all_evcs()
71 1
        self._topology_updated_at = None
72
73 1
    def get_evcs_by_svc_level(self) -> list:
74
        """Get circuits sorted by desc service level and asc creation_time.
75
76
        In the future, as more ops are offloaded it should be get from the DB.
77
        """
78 1
        return sorted(self.circuits.values(),
79
                      key=lambda x: (-x.service_level, x.creation_time))
80
81 1
    @staticmethod
82 1
    def get_eline_controller():
83
        """Return the ELineController instance."""
84
        return controllers.ELineController()
85
86 1
    def execute(self):
87
        """Execute once when the napp is running."""
88 1
        if self._lock.locked():
89 1
            return
90 1
        log.debug("Starting consistency routine")
91 1
        with self._lock:
92 1
            self.execute_consistency()
93 1
        log.debug("Finished consistency routine")
94
95 1
    def should_be_checked(self, circuit):
96
        "Verify if the circuit meets the necessary conditions to be checked"
97
        # pylint: disable=too-many-boolean-expressions
98 1
        if (
99
                circuit.is_enabled()
100
                and not circuit.is_active()
101
                and not circuit.lock.locked()
102
                and not circuit.has_recent_removed_flow()
103
                and not circuit.is_recent_updated()
104
                and circuit.are_unis_active(self.controller.switches)
105
                # if a inter-switch EVC does not have current_path, it does not
106
                # make sense to run sdntrace on it
107
                and (circuit.is_intra_switch() or circuit.current_path)
108
                ):
109 1
            return True
110
        return False
111
112 1
    def execute_consistency(self):
113
        """Execute consistency routine."""
114 1
        circuits_to_check = []
115 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
116 1
        for circuit in self.get_evcs_by_svc_level():
117 1
            stored_circuits.pop(circuit.id, None)
118 1
            if self.should_be_checked(circuit):
119 1
                circuits_to_check.append(circuit)
120 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
121 1
        for circuit in circuits_to_check:
122 1
            is_checked = circuits_checked.get(circuit.id)
123 1
            if is_checked:
124 1
                circuit.execution_rounds = 0
125 1
                log.info(f"{circuit} enabled but inactive - activating")
126 1
                with circuit.lock:
127 1
                    circuit.activate()
128 1
                    circuit.sync()
129
            else:
130 1
                circuit.execution_rounds += 1
131 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
132 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
133 1
                    with circuit.lock:
134 1
                        circuit.deploy()
135 1
        for circuit_id in stored_circuits:
136 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
137 1
            self._load_evc(stored_circuits[circuit_id])
138
139 1
    def shutdown(self):
140
        """Execute when your napp is unloaded.
141
142
        If you have some cleanup procedure, insert it here.
143
        """
144
145 1
    @rest("/v2/evc/", methods=["GET"])
146 1
    def list_circuits(self, request: Request) -> JSONResponse:
147
        """Endpoint to return circuits stored.
148
149
        archive query arg if defined (not null) will be filtered
150
        accordingly, by default only non archived evcs will be listed
151
        """
152 1
        log.debug("list_circuits /v2/evc")
153 1
        args = request.query_params
154 1
        archived = args.get("archived", "false").lower()
155 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
156 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
157
                                                      metadata=args)
158 1
        circuits = circuits['circuits']
159 1
        return JSONResponse(circuits)
160
161 1
    @rest("/v2/evc/schedule", methods=["GET"])
162 1
    def list_schedules(self, _request: Request) -> JSONResponse:
163
        """Endpoint to return all schedules stored for all circuits.
164
165
        Return a JSON with the following template:
166
        [{"schedule_id": <schedule_id>,
167
         "circuit_id": <circuit_id>,
168
         "schedule": <schedule object>}]
169
        """
170 1
        log.debug("list_schedules /v2/evc/schedule")
171 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
172 1
        if not circuits:
173 1
            result = {}
174 1
            status = 200
175 1
            return JSONResponse(result, status_code=status)
176
177 1
        result = []
178 1
        status = 200
179 1
        for circuit in circuits:
180 1
            circuit_scheduler = circuit.get("circuit_scheduler")
181 1
            if circuit_scheduler:
182 1
                for scheduler in circuit_scheduler:
183 1
                    value = {
184
                        "schedule_id": scheduler.get("id"),
185
                        "circuit_id": circuit.get("id"),
186
                        "schedule": scheduler,
187
                    }
188 1
                    result.append(value)
189
190 1
        log.debug("list_schedules result %s %s", result, status)
191 1
        return JSONResponse(result, status_code=status)
192
193 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
194 1
    def get_circuit(self, request: Request) -> JSONResponse:
195
        """Endpoint to return a circuit based on id."""
196 1
        circuit_id = request.path_params["circuit_id"]
197 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
198 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
199 1
        if not circuit:
200 1
            result = f"circuit_id {circuit_id} not found"
201 1
            log.debug("get_circuit result %s %s", result, 404)
202 1
            raise HTTPException(404, detail=result)
203 1
        status = 200
204 1
        log.debug("get_circuit result %s %s", circuit, status)
205 1
        return JSONResponse(circuit, status_code=status)
206
207
    # pylint: disable=too-many-branches, too-many-statements
208 1
    @rest("/v2/evc/", methods=["POST"])
209 1
    @validate_openapi(spec)
210 1
    def create_circuit(self, request: Request) -> JSONResponse:
211
        """Try to create a new circuit.
212
213
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
214
        UNI_Z's requested C-VID are available from the interfaces' pools. This
215
        is checked when creating the UNI object.
216
217
        Then, E-Line NApp requests a primary and a backup path to the
218
        Pathfinder NApp using the attributes primary_links and backup_links
219
        submitted via REST
220
221
        # For each link composing paths in #3:
222
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
223
        #  - Using the S-VID obtained, generate abstract flow entries to be
224
        #    sent to FlowManager
225
226
        Push abstract flow entries to FlowManager and FlowManager pushes
227
        OpenFlow entries to datapaths
228
229
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
230
        creation
231
232
        Finnaly, notify user of the status of its request.
233
        """
234
        # Try to create the circuit object
235 1
        log.debug("create_circuit /v2/evc/")
236 1
        data = get_json_or_400(request, self.controller.loop)
237
238 1
        try:
239 1
            evc = self._evc_from_dict(data)
240 1
        except (ValueError, KytosTagError) as exception:
241 1
            log.debug("create_circuit result %s %s", exception, 400)
242 1
            raise HTTPException(400, detail=str(exception)) from exception
243 1
        try:
244 1
            check_disabled_component(evc.uni_a, evc.uni_z)
245 1
        except DisabledSwitch as exception:
246 1
            log.debug("create_circuit result %s %s", exception, 409)
247 1
            raise HTTPException(
248
                    409,
249
                    detail=f"Path is not valid: {exception}"
250
                ) from exception
251
252 1
        if evc.primary_path:
253 1
            try:
254 1
                evc.primary_path.is_valid(
255
                    evc.uni_a.interface.switch,
256
                    evc.uni_z.interface.switch,
257
                    bool(evc.circuit_scheduler),
258
                )
259 1
            except InvalidPath as exception:
260 1
                raise HTTPException(
261
                    400,
262
                    detail=f"primary_path is not valid: {exception}"
263
                ) from exception
264 1
        if evc.backup_path:
265 1
            try:
266 1
                evc.backup_path.is_valid(
267
                    evc.uni_a.interface.switch,
268
                    evc.uni_z.interface.switch,
269
                    bool(evc.circuit_scheduler),
270
                )
271 1
            except InvalidPath as exception:
272 1
                raise HTTPException(
273
                    400,
274
                    detail=f"backup_path is not valid: {exception}"
275
                ) from exception
276
277 1
        if not evc._tag_lists_equal():
278 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
279 1
            raise HTTPException(400, detail=detail)
280
281 1
        try:
282 1
            evc._validate_has_primary_or_dynamic()
283 1
        except ValueError as exception:
284 1
            raise HTTPException(400, detail=str(exception)) from exception
285
286 1
        try:
287 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
288
        except DuplicatedNoTagUNI as exception:
289
            log.debug("create_circuit result %s %s", exception, 409)
290
            raise HTTPException(409, detail=str(exception)) from exception
291
292 1
        try:
293 1
            self._use_uni_tags(evc)
294 1
        except KytosTagError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297
        # save circuit
298 1
        try:
299 1
            evc.sync()
300
        except ValidationError as exception:
301
            raise HTTPException(400, detail=str(exception)) from exception
302
303
        # store circuit in dictionary
304 1
        self.circuits[evc.id] = evc
305
306
        # Schedule the circuit deploy
307 1
        self.sched.add(evc)
308
309
        # Circuit has no schedule, deploy now
310 1
        if not evc.circuit_scheduler:
311 1
            with evc.lock:
312 1
                evc.deploy()
313
314
        # Notify users
315 1
        result = {"circuit_id": evc.id}
316 1
        status = 201
317 1
        log.debug("create_circuit result %s %s", result, status)
318 1
        emit_event(self.controller, name="created",
319
                   content=map_evc_event_content(evc))
320 1
        return JSONResponse(result, status_code=status)
321
322 1
    @staticmethod
323 1
    def _use_uni_tags(evc):
324 1
        uni_a = evc.uni_a
325 1
        evc._use_uni_vlan(uni_a)
326 1
        try:
327 1
            uni_z = evc.uni_z
328 1
            evc._use_uni_vlan(uni_z)
329 1
        except KytosTagError as err:
330 1
            evc.make_uni_vlan_available(uni_a)
331 1
            raise err
332
333 1
    @listen_to('kytos/flow_manager.flow.removed')
334 1
    def on_flow_delete(self, event):
335
        """Capture delete messages to keep track when flows got removed."""
336
        self.handle_flow_delete(event)
337
338 1
    def handle_flow_delete(self, event):
339
        """Keep track when the EVC got flows removed by deriving its cookie."""
340 1
        flow = event.content["flow"]
341 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
342 1
        if evc:
343 1
            log.debug("Flow removed in EVC %s", evc.id)
344 1
            evc.set_flow_removed_at()
345
346 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
347 1
    @validate_openapi(spec)
348 1
    def update(self, request: Request) -> JSONResponse:
349
        """Update a circuit based on payload.
350
351
        The EVC attributes (creation_time, active, current_path,
352
        failover_path, _id, archived) can't be updated.
353
        """
354 1
        data = get_json_or_400(request, self.controller.loop)
355 1
        circuit_id = request.path_params["circuit_id"]
356 1
        log.debug("update /v2/evc/%s", circuit_id)
357 1
        try:
358 1
            evc = self.circuits[circuit_id]
359 1
        except KeyError:
360 1
            result = f"circuit_id {circuit_id} not found"
361 1
            log.debug("update result %s %s", result, 404)
362 1
            raise HTTPException(404, detail=result) from KeyError
363
364 1
        if evc.archived:
365 1
            result = "Can't update archived EVC"
366 1
            log.debug("update result %s %s", result, 409)
367 1
            raise HTTPException(409, detail=result)
368
369 1
        try:
370 1
            updated_data = self._evc_dict_with_instances(data)
371 1
            self._check_no_tag_duplication(
372
                circuit_id, updated_data.get("uni_a"),
373
                updated_data.get("uni_z")
374
            )
375 1
            enable, redeploy = evc.update(**updated_data)
376 1
        except (ValueError, KytosTagError, ValidationError) as exception:
377 1
            log.debug("update result %s %s", exception, 400)
378 1
            raise HTTPException(400, detail=str(exception)) from exception
379 1
        except DuplicatedNoTagUNI as exception:
380
            log.debug("update result %s %s", exception, 409)
381
            raise HTTPException(409, detail=str(exception)) from exception
382 1
        except DisabledSwitch as exception:
383 1
            log.debug("update result %s %s", exception, 409)
384 1
            raise HTTPException(
385
                    409,
386
                    detail=f"Path is not valid: {exception}"
387
                ) from exception
388
389 1
        if evc.is_active():
390
            if enable is False:  # disable if active
391
                with evc.lock:
392
                    evc.remove()
393
            elif redeploy is not None:  # redeploy if active
394
                with evc.lock:
395
                    evc.remove()
396
                    evc.deploy()
397
        else:
398 1
            if enable is True:  # enable if inactive
399 1
                with evc.lock:
400 1
                    evc.deploy()
401 1
        result = {evc.id: evc.as_dict()}
402 1
        status = 200
403
404 1
        log.debug("update result %s %s", result, status)
405 1
        emit_event(self.controller, "updated",
406
                   content=map_evc_event_content(evc, **data))
407 1
        return JSONResponse(result, status_code=status)
408
409 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
410 1
    def delete_circuit(self, request: Request) -> JSONResponse:
411
        """Remove a circuit.
412
413
        First, the flows are removed from the switches, and then the EVC is
414
        disabled.
415
        """
416 1
        circuit_id = request.path_params["circuit_id"]
417 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
418 1
        try:
419 1
            evc = self.circuits[circuit_id]
420 1
        except KeyError:
421 1
            result = f"circuit_id {circuit_id} not found"
422 1
            log.debug("delete_circuit result %s %s", result, 404)
423 1
            raise HTTPException(404, detail=result) from KeyError
424
425 1
        if evc.archived:
426 1
            result = f"Circuit {circuit_id} already removed"
427 1
            log.debug("delete_circuit result %s %s", result, 404)
428 1
            raise HTTPException(404, detail=result)
429
430 1
        log.info("Removing %s", evc)
431 1
        with evc.lock:
432 1
            evc.remove_current_flows()
433 1
            evc.remove_failover_flows(sync=False)
434 1
            evc.deactivate()
435 1
            evc.disable()
436 1
            self.sched.remove(evc)
437 1
            evc.archive()
438 1
            evc.remove_uni_tags()
439 1
            evc.sync()
440 1
        log.info("EVC removed. %s", evc)
441 1
        result = {"response": f"Circuit {circuit_id} removed"}
442 1
        status = 200
443
444 1
        log.debug("delete_circuit result %s %s", result, status)
445 1
        emit_event(self.controller, "deleted",
446
                   content=map_evc_event_content(evc))
447 1
        return JSONResponse(result, status_code=status)
448
449 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
450 1
    def get_metadata(self, request: Request) -> JSONResponse:
451
        """Get metadata from an EVC."""
452 1
        circuit_id = request.path_params["circuit_id"]
453 1
        try:
454 1
            return (
455
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
456
            )
457
        except KeyError as error:
458
            raise HTTPException(
459
                404,
460
                detail=f"circuit_id {circuit_id} not found."
461
            ) from error
462
463 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
464 1
    @validate_openapi(spec)
465 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
466
        """Add metadata to a bulk of EVCs."""
467 1
        data = get_json_or_400(request, self.controller.loop)
468 1
        circuit_ids = data.pop("circuit_ids")
469
470 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
471
472 1
        fail_evcs = []
473 1
        for _id in circuit_ids:
474 1
            try:
475 1
                evc = self.circuits[_id]
476 1
                evc.extend_metadata(data)
477 1
            except KeyError:
478 1
                fail_evcs.append(_id)
479
480 1
        if fail_evcs:
481 1
            raise HTTPException(404, detail=fail_evcs)
482 1
        return JSONResponse("Operation successful", status_code=201)
483
484 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
485 1
    @validate_openapi(spec)
486 1
    def add_metadata(self, request: Request) -> JSONResponse:
487
        """Add metadata to an EVC."""
488 1
        circuit_id = request.path_params["circuit_id"]
489 1
        metadata = get_json_or_400(request, self.controller.loop)
490 1
        if not isinstance(metadata, dict):
491
            raise HTTPException(400, "Invalid metadata value: {metadata}")
492 1
        try:
493 1
            evc = self.circuits[circuit_id]
494 1
        except KeyError as error:
495 1
            raise HTTPException(
496
                404,
497
                detail=f"circuit_id {circuit_id} not found."
498
            ) from error
499
500 1
        evc.extend_metadata(metadata)
501 1
        evc.sync()
502 1
        return JSONResponse("Operation successful", status_code=201)
503
504 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
505 1
    @validate_openapi(spec)
506 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
507
        """Delete metada from a bulk of EVCs"""
508 1
        data = get_json_or_400(request, self.controller.loop)
509 1
        key = request.path_params["key"]
510 1
        circuit_ids = data.pop("circuit_ids")
511 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
512
513 1
        fail_evcs = []
514 1
        for _id in circuit_ids:
515 1
            try:
516 1
                evc = self.circuits[_id]
517 1
                evc.remove_metadata(key)
518 1
            except KeyError:
519 1
                fail_evcs.append(_id)
520
521 1
        if fail_evcs:
522 1
            raise HTTPException(404, detail=fail_evcs)
523 1
        return JSONResponse("Operation successful")
524
525 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
526 1
    def delete_metadata(self, request: Request) -> JSONResponse:
527
        """Delete metadata from an EVC."""
528 1
        circuit_id = request.path_params["circuit_id"]
529 1
        key = request.path_params["key"]
530 1
        try:
531 1
            evc = self.circuits[circuit_id]
532 1
        except KeyError as error:
533 1
            raise HTTPException(
534
                404,
535
                detail=f"circuit_id {circuit_id} not found."
536
            ) from error
537
538 1
        evc.remove_metadata(key)
539 1
        evc.sync()
540 1
        return JSONResponse("Operation successful")
541
542 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
543 1
    def redeploy(self, request: Request) -> JSONResponse:
544
        """Endpoint to force the redeployment of an EVC."""
545 1
        circuit_id = request.path_params["circuit_id"]
546 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
547 1
        try:
548 1
            evc = self.circuits[circuit_id]
549 1
        except KeyError:
550 1
            raise HTTPException(
551
                404,
552
                detail=f"circuit_id {circuit_id} not found"
553
            ) from KeyError
554 1
        if evc.is_enabled():
555 1
            with evc.lock:
556 1
                evc.remove_current_flows()
557 1
                evc.deploy()
558 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
559 1
            status = 202
560
        else:
561 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
562 1
            status = 409
563
564 1
        return JSONResponse(result, status_code=status)
565
566 1
    @rest("/v2/evc/schedule/", methods=["POST"])
567 1
    @validate_openapi(spec)
568 1
    def create_schedule(self, request: Request) -> JSONResponse:
569
        """
570
        Create a new schedule for a given circuit.
571
572
        This service do no check if there are conflicts with another schedule.
573
        Payload example:
574
            {
575
              "circuit_id":"aa:bb:cc",
576
              "schedule": {
577
                "date": "2019-08-07T14:52:10.967Z",
578
                "interval": "string",
579
                "frequency": "1 * * * *",
580
                "action": "create"
581
              }
582
            }
583
        """
584 1
        log.debug("create_schedule /v2/evc/schedule/")
585 1
        data = get_json_or_400(request, self.controller.loop)
586 1
        circuit_id = data["circuit_id"]
587 1
        schedule_data = data["schedule"]
588
589
        # Get EVC from circuits buffer
590 1
        circuits = self._get_circuits_buffer()
591
592
        # get the circuit
593 1
        evc = circuits.get(circuit_id)
594
595
        # get the circuit
596 1
        if not evc:
597 1
            result = f"circuit_id {circuit_id} not found"
598 1
            log.debug("create_schedule result %s %s", result, 404)
599 1
            raise HTTPException(404, detail=result)
600
        # Can not modify circuits deleted and archived
601 1
        if evc.archived:
602 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
603 1
            log.debug("create_schedule result %s %s", result, 409)
604 1
            raise HTTPException(409, detail=result)
605
606
        # new schedule from dict
607 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
608
609
        # If there is no schedule, create the list
610 1
        if not evc.circuit_scheduler:
611 1
            evc.circuit_scheduler = []
612
613
        # Add the new schedule
614 1
        evc.circuit_scheduler.append(new_schedule)
615
616
        # Add schedule job
617 1
        self.sched.add_circuit_job(evc, new_schedule)
618
619
        # save circuit to mongodb
620 1
        evc.sync()
621
622 1
        result = new_schedule.as_dict()
623 1
        status = 201
624
625 1
        log.debug("create_schedule result %s %s", result, status)
626 1
        return JSONResponse(result, status_code=status)
627
628 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
629 1
    @validate_openapi(spec)
630 1
    def update_schedule(self, request: Request) -> JSONResponse:
631
        """Update a schedule.
632
633
        Change all attributes from the given schedule from a EVC circuit.
634
        The schedule ID is preserved as default.
635
        Payload example:
636
            {
637
              "date": "2019-08-07T14:52:10.967Z",
638
              "interval": "string",
639
              "frequency": "1 * * *",
640
              "action": "create"
641
            }
642
        """
643 1
        data = get_json_or_400(request, self.controller.loop)
644 1
        schedule_id = request.path_params["schedule_id"]
645 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
646
647
        # Try to find a circuit schedule
648 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
649
650
        # Can not modify circuits deleted and archived
651 1
        if not found_schedule:
652 1
            result = f"schedule_id {schedule_id} not found"
653 1
            log.debug("update_schedule result %s %s", result, 404)
654 1
            raise HTTPException(404, detail=result)
655 1
        if evc.archived:
656 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
657 1
            log.debug("update_schedule result %s %s", result, 409)
658 1
            raise HTTPException(409, detail=result)
659
660 1
        new_schedule = CircuitSchedule.from_dict(data)
661 1
        new_schedule.id = found_schedule.id
662
        # Remove the old schedule
663 1
        evc.circuit_scheduler.remove(found_schedule)
664
        # Append the modified schedule
665 1
        evc.circuit_scheduler.append(new_schedule)
666
667
        # Cancel all schedule jobs
668 1
        self.sched.cancel_job(found_schedule.id)
669
        # Add the new circuit schedule
670 1
        self.sched.add_circuit_job(evc, new_schedule)
671
        # Save EVC to mongodb
672 1
        evc.sync()
673
674 1
        result = new_schedule.as_dict()
675 1
        status = 200
676
677 1
        log.debug("update_schedule result %s %s", result, status)
678 1
        return JSONResponse(result, status_code=status)
679
680 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
681 1
    def delete_schedule(self, request: Request) -> JSONResponse:
682
        """Remove a circuit schedule.
683
684
        Remove the Schedule from EVC.
685
        Remove the Schedule from cron job.
686
        Save the EVC to the Storehouse.
687
        """
688 1
        schedule_id = request.path_params["schedule_id"]
689 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
690 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
691
692
        # Can not modify circuits deleted and archived
693 1
        if not found_schedule:
694 1
            result = f"schedule_id {schedule_id} not found"
695 1
            log.debug("delete_schedule result %s %s", result, 404)
696 1
            raise HTTPException(404, detail=result)
697
698 1
        if evc.archived:
699 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
700 1
            log.debug("delete_schedule result %s %s", result, 409)
701 1
            raise HTTPException(409, detail=result)
702
703
        # Remove the old schedule
704 1
        evc.circuit_scheduler.remove(found_schedule)
705
706
        # Cancel all schedule jobs
707 1
        self.sched.cancel_job(found_schedule.id)
708
        # Save EVC to mongodb
709 1
        evc.sync()
710
711 1
        result = "Schedule removed"
712 1
        status = 200
713
714 1
        log.debug("delete_schedule result %s %s", result, status)
715 1
        return JSONResponse(result, status_code=status)
716
717 1
    def _check_no_tag_duplication(
718
        self,
719
        evc_id: str,
720
        uni_a: Optional[UNI] = None,
721
        uni_z: Optional[UNI] = None
722
    ):
723
        """Check if the given EVC has UNIs with no tag and if these are
724
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
725
        Args:
726
            evc (dict): EVC to be analyzed.
727
        """
728
729
        # No UNIs
730 1
        if not (uni_a or uni_z):
731 1
            return
732
733 1
        if (not (uni_a and not uni_a.user_tag) and
734
                not (uni_z and not uni_z.user_tag)):
735 1
            return
736 1
        for circuit in self.circuits.copy().values():
737 1
            if (not circuit.archived and circuit._id != evc_id):
738 1
                if uni_a and uni_a.user_tag is None:
739 1
                    circuit.check_no_tag_duplicate(uni_a)
740 1
                if uni_z and uni_z.user_tag is None:
741 1
                    circuit.check_no_tag_duplicate(uni_z)
742
743 1
    @listen_to("kytos/topology.link_up")
744 1
    def on_link_up(self, event):
745
        """Change circuit when link is up or end_maintenance."""
746
        self.handle_link_up(event)
747
748 1
    def handle_link_up(self, event):
749
        """Change circuit when link is up or end_maintenance."""
750 1
        log.info("Event handle_link_up %s", event.content["link"])
751 1
        for evc in self.get_evcs_by_svc_level():
752 1
            if evc.is_enabled() and not evc.archived:
753 1
                with evc.lock:
754 1
                    evc.handle_link_up(event.content["link"])
755
756
    # Possibly replace this with interruptions?
757 1
    @listen_to(
758
        '.*.switch.interface.(link_up|link_down|created|deleted)',
759
        pool='dynamic_single'
760
    )
761 1
    def on_interface_link_change(self, event: KytosEvent):
762
        """
763
        Handler for interface link_up and link_down events
764
        """
765
        with self._lock:
766
            _, _, event_type = event.name.rpartition('.')
767
            iface = event.content.get("interface")
768
            if event_type in ('link_up', 'created'):
769
                self.handle_interface_link_up(iface)
770
            elif event_type in ('link_down', 'deleted'):
771
                self.handle_interface_link_down(iface)
772
773 1
    def handle_interface_link_up(self, interface):
774
        """
775
        Handler for interface link_up events
776
        """
777
        for evc in self.get_evcs_by_svc_level():
778
            log.info("Event handle_interface_link_up %s", interface)
779
            evc.handle_interface_link_up(
780
                interface
781
            )
782
783 1
    def handle_interface_link_down(self, interface):
784
        """
785
        Handler for interface link_down events
786
        """
787
        for evc in self.get_evcs_by_svc_level():
788
            log.info("Event handle_interface_link_down %s", interface)
789
            evc.handle_interface_link_down(
790
                interface
791
            )
792
793 1
    @listen_to("kytos/topology.link_down")
794 1
    def on_link_down(self, event):
795
        """Change circuit when link is down or under_mantenance."""
796
        self.handle_link_down(event)
797
798 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
799
        """Change circuit when link is down or under_mantenance."""
800 1
        link = event.content["link"]
801 1
        log.info("Event handle_link_down %s", link)
802 1
        switch_flows = {}
803 1
        evcs_with_failover = []
804 1
        evcs_normal = []
805 1
        check_failover = []
806 1
        for evc in self.get_evcs_by_svc_level():
807 1
            if evc.is_affected_by_link(link):
808
                # if there is no failover path, handles link down the
809
                # tradditional way
810 1
                if (
811
                    not getattr(evc, 'failover_path', None) or
812
                    evc.is_failover_path_affected_by_link(link)
813
                ):
814 1
                    evcs_normal.append(evc)
815 1
                    continue
816 1
                try:
817 1
                    dpid_flows = evc.get_failover_flows()
818
                # pylint: disable=broad-except
819 1
                except Exception:
820 1
                    err = traceback.format_exc().replace("\n", ", ")
821 1
                    log.error(
822
                        f"Ignore Failover path for {evc} due to error: {err}"
823
                    )
824 1
                    evcs_normal.append(evc)
825 1
                    continue
826 1
                for dpid, flows in dpid_flows.items():
827 1
                    switch_flows.setdefault(dpid, [])
828 1
                    switch_flows[dpid].extend(flows)
829 1
                evcs_with_failover.append(evc)
830
            else:
831 1
                check_failover.append(evc)
832
833 1
        while switch_flows:
834 1
            offset = settings.BATCH_SIZE or None
835 1
            switches = list(switch_flows.keys())
836 1
            for dpid in switches:
837 1
                emit_event(
838
                    self.controller,
839
                    context="kytos.flow_manager",
840
                    name="flows.install",
841
                    content={
842
                        "dpid": dpid,
843
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
844
                    }
845
                )
846 1
                if offset is None or offset >= len(switch_flows[dpid]):
847 1
                    del switch_flows[dpid]
848 1
                    continue
849 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
850 1
            time.sleep(settings.BATCH_INTERVAL)
851
852 1
        for evc in evcs_with_failover:
853 1
            with evc.lock:
854 1
                old_path = evc.current_path
855 1
                evc.current_path = evc.failover_path
856 1
                evc.failover_path = old_path
857 1
                evc.sync()
858 1
            emit_event(self.controller, "redeployed_link_down",
859
                       content=map_evc_event_content(evc))
860 1
            log.info(
861
                f"{evc} redeployed with failover due to link down {link.id}"
862
            )
863
864 1
        for evc in evcs_normal:
865 1
            emit_event(
866
                self.controller,
867
                "evc_affected_by_link_down",
868
                content={"link_id": link.id} | map_evc_event_content(evc)
869
            )
870
871
        # After handling the hot path, check if new failover paths are needed.
872
        # Note that EVCs affected by link down will generate a KytosEvent for
873
        # deployed|redeployed, which will trigger the failover path setup.
874
        # Thus, we just need to further check the check_failover list
875 1
        for evc in check_failover:
876 1
            if evc.is_failover_path_affected_by_link(link):
877 1
                with evc.lock:
878 1
                    evc.setup_failover_path()
879
880 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
881 1
    def on_evc_affected_by_link_down(self, event):
882
        """Change circuit when link is down or under_mantenance."""
883
        self.handle_evc_affected_by_link_down(event)
884
885 1
    def handle_evc_affected_by_link_down(self, event):
886
        """Change circuit when link is down or under_mantenance."""
887 1
        evc = self.circuits.get(event.content["evc_id"])
888 1
        link_id = event.content['link_id']
889 1
        if not evc:
890 1
            return
891 1
        with evc.lock:
892 1
            result = evc.handle_link_down()
893 1
        event_name = "error_redeploy_link_down"
894 1
        if result:
895 1
            log.info(f"{evc} redeployed due to link down {link_id}")
896 1
            event_name = "redeployed_link_down"
897 1
        emit_event(self.controller, event_name,
898
                   content=map_evc_event_content(evc))
899
900 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
901 1
    def on_evc_deployed(self, event):
902
        """Handle EVC deployed|redeployed_link_down."""
903
        self.handle_evc_deployed(event)
904
905 1
    def handle_evc_deployed(self, event):
906
        """Setup failover path on evc deployed."""
907 1
        evc = self.circuits.get(event.content["evc_id"])
908 1
        if not evc:
909 1
            return
910 1
        with evc.lock:
911 1
            evc.setup_failover_path()
912
913 1
    @listen_to("kytos/topology.topology_loaded")
914 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
915
        """Load EVCs once the topology is available."""
916
        self.load_all_evcs()
917
918 1
    def load_all_evcs(self):
919
        """Try to load all EVCs on startup."""
920 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
921 1
        for circuit_id, circuit in circuits:
922 1
            if circuit_id not in self.circuits:
923 1
                self._load_evc(circuit)
924 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits))
925
926 1
    def _load_evc(self, circuit_dict):
927
        """Load one EVC from mongodb to memory."""
928 1
        try:
929 1
            evc = self._evc_from_dict(circuit_dict)
930 1
        except (ValueError, KytosTagError) as exception:
931 1
            log.error(
932
                f"Could not load EVC: dict={circuit_dict} error={exception}"
933
            )
934 1
            return None
935 1
        if evc.archived:
936 1
            return None
937
938 1
        self.circuits.setdefault(evc.id, evc)
939 1
        self.sched.add(evc)
940 1
        return evc
941
942 1
    @listen_to("kytos/flow_manager.flow.error")
943 1
    def on_flow_mod_error(self, event):
944
        """Handle flow mod errors related to an EVC."""
945
        self.handle_flow_mod_error(event)
946
947 1
    def handle_flow_mod_error(self, event):
948
        """Handle flow mod errors related to an EVC."""
949 1
        flow = event.content["flow"]
950 1
        command = event.content.get("error_command")
951 1
        if command != "add":
952
            return
953 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
954 1
        if evc:
955 1
            with evc.lock:
956 1
                evc.remove_current_flows()
957
958 1
    def _evc_dict_with_instances(self, evc_dict):
959
        """Convert some dict values to instance of EVC classes.
960
961
        This method will convert: [UNI, Link]
962
        """
963 1
        data = evc_dict.copy()  # Do not modify the original dict
964 1
        for attribute, value in data.items():
965
            # Get multiple attributes.
966
            # Ex: uni_a, uni_z
967 1
            if "uni" in attribute:
968 1
                try:
969 1
                    data[attribute] = self._uni_from_dict(value)
970 1
                except ValueError as exception:
971 1
                    result = "Error creating UNI: Invalid value"
972 1
                    raise ValueError(result) from exception
973
974 1
            if attribute == "circuit_scheduler":
975 1
                data[attribute] = []
976 1
                for schedule in value:
977 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
978
979
            # Get multiple attributes.
980
            # Ex: primary_links,
981
            #     backup_links,
982
            #     current_links_cache,
983
            #     primary_links_cache,
984
            #     backup_links_cache
985 1
            if "links" in attribute:
986 1
                data[attribute] = [
987
                    self._link_from_dict(link) for link in value
988
                ]
989
990
            # Ex: current_path,
991
            #     primary_path,
992
            #     backup_path
993 1
            if "path" in attribute and attribute != "dynamic_backup_path":
994 1
                data[attribute] = Path(
995
                    [self._link_from_dict(link) for link in value]
996
                )
997
998 1
        return data
999
1000 1
    def _evc_from_dict(self, evc_dict):
1001 1
        data = self._evc_dict_with_instances(evc_dict)
1002 1
        data["table_group"] = self.table_group
1003 1
        return EVC(self.controller, **data)
1004
1005 1
    def _uni_from_dict(self, uni_dict):
1006
        """Return a UNI object from python dict."""
1007 1
        if uni_dict is None:
1008 1
            return False
1009
1010 1
        interface_id = uni_dict.get("interface_id")
1011 1
        interface = self.controller.get_interface_by_id(interface_id)
1012 1
        if interface is None:
1013 1
            result = (
1014
                "Error creating UNI:"
1015
                + f"Could not instantiate interface {interface_id}"
1016
            )
1017 1
            raise ValueError(result) from ValueError
1018 1
        tag_convert = {1: "vlan"}
1019 1
        tag_dict = uni_dict.get("tag", None)
1020 1
        if tag_dict:
1021 1
            tag_type = tag_dict.get("tag_type")
1022 1
            tag_type = tag_convert.get(tag_type, tag_type)
1023 1
            tag_value = tag_dict.get("value")
1024 1
            if isinstance(tag_value, list):
1025 1
                tag_value = get_tag_ranges(tag_value)
1026 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1027 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1028
            else:
1029 1
                tag = TAG(tag_type, tag_value)
1030
        else:
1031 1
            tag = None
1032 1
        uni = UNI(interface, tag)
1033 1
        return uni
1034
1035 1
    def _link_from_dict(self, link_dict):
1036
        """Return a Link object from python dict."""
1037 1
        id_a = link_dict.get("endpoint_a").get("id")
1038 1
        id_b = link_dict.get("endpoint_b").get("id")
1039
1040 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1041 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1042 1
        if not endpoint_a:
1043 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1044 1
            raise ValueError(error_msg)
1045 1
        if not endpoint_b:
1046
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1047
            raise ValueError(error_msg)
1048
1049 1
        link = Link(endpoint_a, endpoint_b)
1050 1
        if "metadata" in link_dict:
1051 1
            link.extend_metadata(link_dict.get("metadata"))
1052
1053 1
        s_vlan = link.get_metadata("s_vlan")
1054 1
        if s_vlan:
1055 1
            tag = TAG.from_dict(s_vlan)
1056 1
            if tag is False:
1057
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1058
                raise ValueError(error_msg)
1059 1
            link.update_metadata("s_vlan", tag)
1060 1
        return link
1061
1062 1
    def _find_evc_by_schedule_id(self, schedule_id):
1063
        """
1064
        Find an EVC and CircuitSchedule based on schedule_id.
1065
1066
        :param schedule_id: Schedule ID
1067
        :return: EVC and Schedule
1068
        """
1069 1
        circuits = self._get_circuits_buffer()
1070 1
        found_schedule = None
1071 1
        evc = None
1072
1073
        # pylint: disable=unused-variable
1074 1
        for c_id, circuit in circuits.items():
1075 1
            for schedule in circuit.circuit_scheduler:
1076 1
                if schedule.id == schedule_id:
1077 1
                    found_schedule = schedule
1078 1
                    evc = circuit
1079 1
                    break
1080 1
            if found_schedule:
1081 1
                break
1082 1
        return evc, found_schedule
1083
1084 1
    def _get_circuits_buffer(self):
1085
        """
1086
        Return the circuit buffer.
1087
1088
        If the buffer is empty, try to load data from mongodb.
1089
        """
1090 1
        if not self.circuits:
1091
            # Load circuits from mongodb to buffer
1092 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1093 1
            for c_id, circuit in circuits.items():
1094 1
                evc = self._evc_from_dict(circuit)
1095 1
                self.circuits[c_id] = evc
1096 1
        return self.circuits
1097
1098
    # pylint: disable=attribute-defined-outside-init
1099 1
    @alisten_to("kytos/of_multi_table.enable_table")
1100 1
    async def on_table_enabled(self, event):
1101
        """Handle a recently table enabled."""
1102 1
        table_group = event.content.get("mef_eline", None)
1103 1
        if not table_group:
1104 1
            return
1105 1
        for group in table_group:
1106 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1107 1
                log.error(f'The table group "{group}" is not allowed for '
1108
                          f'mef_eline. Allowed table groups are '
1109
                          f'{settings.TABLE_GROUP_ALLOWED}')
1110 1
                return
1111 1
        self.table_group.update(table_group)
1112 1
        content = {"group_table": self.table_group}
1113 1
        name = "kytos/mef_eline.enable_table"
1114
        await aemit_event(self.controller, name, content)
1115