Passed
Pull Request — master (#349)
by
unknown
03:33
created

build.main   F

Complexity

Total Complexity 182

Size/Duplication

Total Lines 1040
Duplicated Lines 3.85 %

Test Coverage

Coverage 93.26%

Importance

Changes 0
Metric Value
eloc 682
dl 40
loc 1040
ccs 567
cts 608
cp 0.9326
rs 1.918
c 0
b 0
f 0
wmc 182

47 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.shutdown() 0 2 1
C Main.execute_consistency() 0 26 9
A Main.setup() 0 28 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
A Main.list_circuits() 0 15 1
A Main.add_metadata() 0 19 3
C Main._evc_dict_with_instances() 0 41 9
A Main.on_flow_delete() 0 4 1
A Main.on_link_down() 0 4 1
A Main.handle_evc_deployed() 0 7 3
A Main.update_schedule() 0 51 3
A Main._evc_from_dict() 0 4 1
A Main._find_evc_by_schedule_id() 0 21 5
A Main.load_all_evcs() 0 6 3
A Main.get_metadata() 0 13 2
A Main.delete_metadata() 0 16 2
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 36 3
A Main.get_circuit() 0 13 2
A Main.handle_evc_affected_by_link_down() 0 14 4
F Main.handle_link_down() 0 70 15
A Main.on_evc_deployed() 0 4 1
B Main.handle_topology_update() 0 14 8
C Main.should_be_checked() 0 17 9
A Main.on_topology_update() 0 4 1
B Main.list_schedules() 0 31 5
D Main.create_circuit() 0 105 12
B Main._link_from_dict() 0 26 6
A Main._uni_from_dict() 0 22 4
A Main.redeploy() 0 23 4
A Main.handle_flow_mod_error() 0 9 3
A Main._is_duplicated_evc() 0 14 4
D Main.update() 0 59 13
A Main.handle_flow_delete() 0 7 2
A Main.bulk_add_metadata() 20 20 4
A Main.on_flow_mod_error() 0 4 1
A Main.bulk_delete_metadata() 20 20 4
A Main.on_topology_loaded() 0 4 1
A Main.on_table_enabled() 0 16 4
A Main._load_evc() 0 17 3
A Main.create_schedule() 0 61 4
A Main._get_circuits_buffer() 0 13 3
A Main.delete_circuit() 0 38 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
25
                                         emit_event, map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64 1
        self._topology_updated_at = None
65
66 1
    def get_evcs_by_svc_level(self) -> list:
67
        """Get circuits sorted by desc service level and asc creation_time.
68
69
        In the future, as more ops are offloaded it should be get from the DB.
70
        """
71 1
        return sorted(self.circuits.values(),
72
                      key=lambda x: (-x.service_level, x.creation_time))
73
74 1
    @staticmethod
75 1
    def get_eline_controller():
76
        """Return the ELineController instance."""
77
        return controllers.ELineController()
78
79 1
    def execute(self):
80
        """Execute once when the napp is running."""
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    @staticmethod
89 1
    def should_be_checked(circuit, switches):
90
        "Verify if the circuit meets the necessary conditions to be checked"
91
        # pylint: disable=too-many-boolean-expressions
92 1
        if (
93
                circuit.is_enabled()
94
                and not circuit.is_active()
95
                and not circuit.lock.locked()
96
                and not circuit.has_recent_removed_flow()
97
                and not circuit.is_recent_updated()
98
                and circuit.are_unis_active(switches)
99
                # if a inter-switch EVC does not have current_path, it does not
100
                # make sense to run sdntrace on it
101
                and (circuit.is_intra_switch() or circuit.current_path)
102
                ):
103 1
            return True
104
        return False
105
106 1
    def execute_consistency(self):
107
        """Execute consistency routine."""
108 1
        circuits_to_check = []
109 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
110 1
        for circuit in self.get_evcs_by_svc_level():
111 1
            stored_circuits.pop(circuit.id, None)
112 1
            if self.should_be_checked(circuit, self.controller.switches):
113 1
                circuits_to_check.append(circuit)
114 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
115 1
        for circuit in circuits_to_check:
116 1
            is_checked = circuits_checked.get(circuit.id)
117 1
            if is_checked:
118 1
                circuit.execution_rounds = 0
119 1
                log.info(f"{circuit} enabled but inactive - activating")
120 1
                with circuit.lock:
121 1
                    circuit.activate()
122 1
                    circuit.sync()
123
            else:
124 1
                circuit.execution_rounds += 1
125 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
126 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
127 1
                    with circuit.lock:
128 1
                        circuit.deploy()
129 1
        for circuit_id in stored_circuits:
130 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
131 1
            self._load_evc(stored_circuits[circuit_id])
132
133 1
    def shutdown(self):
134
        """Execute when your napp is unloaded.
135
136
        If you have some cleanup procedure, insert it here.
137
        """
138
139 1
    @rest("/v2/evc/", methods=["GET"])
140 1
    def list_circuits(self, request: Request) -> JSONResponse:
141
        """Endpoint to return circuits stored.
142
143
        archive query arg if defined (not null) will be filtered
144
        accordingly, by default only non archived evcs will be listed
145
        """
146 1
        log.debug("list_circuits /v2/evc")
147 1
        args = request.query_params
148 1
        archived = args.get("archived", "false").lower()
149 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
150 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
151
                                                      metadata=args)
152 1
        circuits = circuits['circuits']
153 1
        return JSONResponse(circuits)
154
155 1
    @rest("/v2/evc/schedule", methods=["GET"])
156 1
    def list_schedules(self, _request: Request) -> JSONResponse:
157
        """Endpoint to return all schedules stored for all circuits.
158
159
        Return a JSON with the following template:
160
        [{"schedule_id": <schedule_id>,
161
         "circuit_id": <circuit_id>,
162
         "schedule": <schedule object>}]
163
        """
164 1
        log.debug("list_schedules /v2/evc/schedule")
165 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
166 1
        if not circuits:
167 1
            result = {}
168 1
            status = 200
169 1
            return JSONResponse(result, status_code=status)
170
171 1
        result = []
172 1
        status = 200
173 1
        for circuit in circuits:
174 1
            circuit_scheduler = circuit.get("circuit_scheduler")
175 1
            if circuit_scheduler:
176 1
                for scheduler in circuit_scheduler:
177 1
                    value = {
178
                        "schedule_id": scheduler.get("id"),
179
                        "circuit_id": circuit.get("id"),
180
                        "schedule": scheduler,
181
                    }
182 1
                    result.append(value)
183
184 1
        log.debug("list_schedules result %s %s", result, status)
185 1
        return JSONResponse(result, status_code=status)
186
187 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
188 1
    def get_circuit(self, request: Request) -> JSONResponse:
189
        """Endpoint to return a circuit based on id."""
190 1
        circuit_id = request.path_params["circuit_id"]
191 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
192 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
193 1
        if not circuit:
194 1
            result = f"circuit_id {circuit_id} not found"
195 1
            log.debug("get_circuit result %s %s", result, 404)
196 1
            raise HTTPException(404, detail=result)
197 1
        status = 200
198 1
        log.debug("get_circuit result %s %s", circuit, status)
199 1
        return JSONResponse(circuit, status_code=status)
200
201 1
    @rest("/v2/evc/", methods=["POST"])
202 1
    @validate_openapi(spec)
203 1
    def create_circuit(self, request: Request) -> JSONResponse:
204
        """Try to create a new circuit.
205
206
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
207
        UNI_Z's requested C-VID are available from the interfaces' pools. This
208
        is checked when creating the UNI object.
209
210
        Then, E-Line NApp requests a primary and a backup path to the
211
        Pathfinder NApp using the attributes primary_links and backup_links
212
        submitted via REST
213
214
        # For each link composing paths in #3:
215
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
216
        #  - Using the S-VID obtained, generate abstract flow entries to be
217
        #    sent to FlowManager
218
219
        Push abstract flow entries to FlowManager and FlowManager pushes
220
        OpenFlow entries to datapaths
221
222
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
223
        creation
224
225
        Finnaly, notify user of the status of its request.
226
        """
227
        # Try to create the circuit object
228 1
        log.debug("create_circuit /v2/evc/")
229 1
        data = get_json_or_400(request, self.controller.loop)
230
231 1
        try:
232 1
            evc = self._evc_from_dict(data)
233 1
        except ValueError as exception:
234 1
            log.debug("create_circuit result %s %s", exception, 400)
235 1
            raise HTTPException(400, detail=str(exception)) from exception
236
237 1
        try:
238 1
            check_disabled_component(evc.uni_a, evc.uni_z)
239 1
        except DisabledSwitch as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 409)
241 1
            raise HTTPException(
242
                    409,
243
                    detail=f"Path is not valid: {exception}"
244
                ) from exception
245
246 1
        if evc.primary_path:
247
            try:
248
                evc.primary_path.is_valid(
249
                    evc.uni_a.interface.switch,
250
                    evc.uni_z.interface.switch,
251
                    bool(evc.circuit_scheduler),
252
                )
253
            except InvalidPath as exception:
254
                raise HTTPException(
255
                    400,
256
                    detail=f"primary_path is not valid: {exception}"
257
                ) from exception
258 1
        if evc.backup_path:
259
            try:
260
                evc.backup_path.is_valid(
261
                    evc.uni_a.interface.switch,
262
                    evc.uni_z.interface.switch,
263
                    bool(evc.circuit_scheduler),
264
                )
265
            except InvalidPath as exception:
266
                raise HTTPException(
267
                    400,
268
                    detail=f"backup_path is not valid: {exception}"
269
                ) from exception
270
271
        # verify duplicated evc
272 1
        if self._is_duplicated_evc(evc):
273 1
            result = "The EVC already exists."
274 1
            log.debug("create_circuit result %s %s", result, 409)
275 1
            raise HTTPException(409, detail=result)
276
277 1
        try:
278 1
            evc._validate_has_primary_or_dynamic()
279 1
        except ValueError as exception:
280 1
            raise HTTPException(400, detail=str(exception)) from exception
281
282
        # save circuit
283 1
        try:
284 1
            evc.sync()
285
        except ValidationError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288
        # store circuit in dictionary
289 1
        self.circuits[evc.id] = evc
290
291
        # Schedule the circuit deploy
292 1
        self.sched.add(evc)
293
294
        # Circuit has no schedule, deploy now
295 1
        if not evc.circuit_scheduler:
296 1
            with evc.lock:
297 1
                evc.deploy()
298
299
        # Notify users
300 1
        result = {"circuit_id": evc.id}
301 1
        status = 201
302 1
        log.debug("create_circuit result %s %s", result, status)
303 1
        emit_event(self.controller, name="created",
304
                   content=map_evc_event_content(evc))
305 1
        return JSONResponse(result, status_code=status)
306
307 1
    @listen_to('kytos/flow_manager.flow.removed')
308 1
    def on_flow_delete(self, event):
309
        """Capture delete messages to keep track when flows got removed."""
310
        self.handle_flow_delete(event)
311
312 1
    def handle_flow_delete(self, event):
313
        """Keep track when the EVC got flows removed by deriving its cookie."""
314 1
        flow = event.content["flow"]
315 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
316 1
        if evc:
317 1
            log.debug("Flow removed in EVC %s", evc.id)
318 1
            evc.set_flow_removed_at()
319
320 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
321 1
    @validate_openapi(spec)
322 1
    def update(self, request: Request) -> JSONResponse:
323
        """Update a circuit based on payload.
324
325
        The EVC attributes (creation_time, active, current_path,
326
        failover_path, _id, archived) can't be updated.
327
        """
328 1
        data = get_json_or_400(request, self.controller.loop)
329 1
        circuit_id = request.path_params["circuit_id"]
330 1
        log.debug("update /v2/evc/%s", circuit_id)
331 1
        try:
332 1
            evc = self.circuits[circuit_id]
333 1
        except KeyError:
334 1
            result = f"circuit_id {circuit_id} not found"
335 1
            log.debug("update result %s %s", result, 404)
336 1
            raise HTTPException(404, detail=result) from KeyError
337
338 1
        if evc.archived:
339 1
            result = "Can't update archived EVC"
340 1
            log.debug("update result %s %s", result, 409)
341 1
            raise HTTPException(409, detail=result)
342
343 1
        try:
344 1
            enable, redeploy = evc.update(
345
                **self._evc_dict_with_instances(data)
346
            )
347 1
        except ValidationError as exception:
348
            raise HTTPException(400, detail=str(exception)) from exception
349 1
        except ValueError as exception:
350 1
            log.error(exception)
351 1
            log.debug("update result %s %s", exception, 400)
352 1
            raise HTTPException(400, detail=str(exception)) from exception
353 1
        except DisabledSwitch as exception:
354 1
            log.debug("update result %s %s", exception, 409)
355 1
            raise HTTPException(
356
                    409,
357
                    detail=f"Path is not valid: {exception}"
358
                ) from exception
359
360 1
        if evc.is_active():
361
            if enable is False:  # disable if active
362
                with evc.lock:
363
                    evc.remove()
364
            elif redeploy is not None:  # redeploy if active
365
                with evc.lock:
366
                    evc.remove()
367
                    evc.deploy()
368
        else:
369 1
            if enable is True:  # enable if inactive
370 1
                with evc.lock:
371 1
                    evc.deploy()
372 1
        result = {evc.id: evc.as_dict()}
373 1
        status = 200
374
375 1
        log.debug("update result %s %s", result, status)
376 1
        emit_event(self.controller, "updated",
377
                   content=map_evc_event_content(evc, **data))
378 1
        return JSONResponse(result, status_code=status)
379
380 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
381 1
    def delete_circuit(self, request: Request) -> JSONResponse:
382
        """Remove a circuit.
383
384
        First, the flows are removed from the switches, and then the EVC is
385
        disabled.
386
        """
387 1
        circuit_id = request.path_params["circuit_id"]
388 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
389 1
        try:
390 1
            evc = self.circuits[circuit_id]
391 1
        except KeyError:
392 1
            result = f"circuit_id {circuit_id} not found"
393 1
            log.debug("delete_circuit result %s %s", result, 404)
394 1
            raise HTTPException(404, detail=result) from KeyError
395
396 1
        if evc.archived:
397 1
            result = f"Circuit {circuit_id} already removed"
398 1
            log.debug("delete_circuit result %s %s", result, 404)
399 1
            raise HTTPException(404, detail=result)
400
401 1
        log.info("Removing %s", evc)
402 1
        with evc.lock:
403 1
            evc.remove_current_flows()
404 1
            evc.remove_failover_flows(sync=False)
405 1
            evc.deactivate()
406 1
            evc.disable()
407 1
            self.sched.remove(evc)
408 1
            evc.archive()
409 1
            evc.sync()
410 1
        log.info("EVC removed. %s", evc)
411 1
        result = {"response": f"Circuit {circuit_id} removed"}
412 1
        status = 200
413
414 1
        log.debug("delete_circuit result %s %s", result, status)
415 1
        emit_event(self.controller, "deleted",
416
                   content=map_evc_event_content(evc))
417 1
        return JSONResponse(result, status_code=status)
418
419 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
420 1
    def get_metadata(self, request: Request) -> JSONResponse:
421
        """Get metadata from an EVC."""
422 1
        circuit_id = request.path_params["circuit_id"]
423 1
        try:
424 1
            return (
425
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
426
            )
427
        except KeyError as error:
428
            raise HTTPException(
429
                404,
430
                detail=f"circuit_id {circuit_id} not found."
431
            ) from error
432
433 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
434 1
    @validate_openapi(spec)
435 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
436
        """Add metadata to a bulk of EVCs."""
437 1
        data = get_json_or_400(request, self.controller.loop)
438 1
        circuit_ids = data.pop("circuit_ids")
439
440 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
441
442 1
        fail_evcs = []
443 1
        for _id in circuit_ids:
444 1
            try:
445 1
                evc = self.circuits[_id]
446 1
                evc.extend_metadata(data)
447 1
            except KeyError:
448 1
                fail_evcs.append(_id)
449
450 1
        if fail_evcs:
451 1
            raise HTTPException(404, detail=fail_evcs)
452 1
        return JSONResponse("Operation successful", status_code=201)
453
454 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
455 1
    @validate_openapi(spec)
456 1
    def add_metadata(self, request: Request) -> JSONResponse:
457
        """Add metadata to an EVC."""
458 1
        circuit_id = request.path_params["circuit_id"]
459 1
        metadata = get_json_or_400(request, self.controller.loop)
460 1
        if not isinstance(metadata, dict):
461
            raise HTTPException(400, "Invalid metadata value: {metadata}")
462 1
        try:
463 1
            evc = self.circuits[circuit_id]
464 1
        except KeyError as error:
465 1
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1
        evc.extend_metadata(metadata)
471 1
        evc.sync()
472 1
        return JSONResponse("Operation successful", status_code=201)
473
474 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
475 1
    @validate_openapi(spec)
476 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
477
        """Delete metada from a bulk of EVCs"""
478 1
        data = get_json_or_400(request, self.controller.loop)
479 1
        key = request.path_params["key"]
480 1
        circuit_ids = data.pop("circuit_ids")
481 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
482
483 1
        fail_evcs = []
484 1
        for _id in circuit_ids:
485 1
            try:
486 1
                evc = self.circuits[_id]
487 1
                evc.remove_metadata(key)
488
            except KeyError:
489
                fail_evcs.append(_id)
490
491 1
        if fail_evcs:
492
            raise HTTPException(404, detail=fail_evcs)
493 1
        return JSONResponse("Operation successful")
494
495 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
496 1
    def delete_metadata(self, request: Request) -> JSONResponse:
497
        """Delete metadata from an EVC."""
498 1
        circuit_id = request.path_params["circuit_id"]
499 1
        key = request.path_params["key"]
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError as error:
503 1
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found."
506
            ) from error
507
508 1
        evc.remove_metadata(key)
509 1
        evc.sync()
510 1
        return JSONResponse("Operation successful")
511
512 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
513 1
    def redeploy(self, request: Request) -> JSONResponse:
514
        """Endpoint to force the redeployment of an EVC."""
515 1
        circuit_id = request.path_params["circuit_id"]
516 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
517 1
        try:
518 1
            evc = self.circuits[circuit_id]
519 1
        except KeyError:
520 1
            raise HTTPException(
521
                404,
522
                detail=f"circuit_id {circuit_id} not found"
523
            ) from KeyError
524 1
        if evc.is_enabled():
525 1
            with evc.lock:
526 1
                evc.remove_current_flows()
527 1
                evc.deploy()
528 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
529 1
            status = 202
530
        else:
531 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
532 1
            status = 409
533
534 1
        return JSONResponse(result, status_code=status)
535
536 1
    @rest("/v2/evc/schedule/", methods=["POST"])
537 1
    @validate_openapi(spec)
538 1
    def create_schedule(self, request: Request) -> JSONResponse:
539
        """
540
        Create a new schedule for a given circuit.
541
542
        This service do no check if there are conflicts with another schedule.
543
        Payload example:
544
            {
545
              "circuit_id":"aa:bb:cc",
546
              "schedule": {
547
                "date": "2019-08-07T14:52:10.967Z",
548
                "interval": "string",
549
                "frequency": "1 * * * *",
550
                "action": "create"
551
              }
552
            }
553
        """
554 1
        log.debug("create_schedule /v2/evc/schedule/")
555 1
        data = get_json_or_400(request, self.controller.loop)
556 1
        circuit_id = data["circuit_id"]
557 1
        schedule_data = data["schedule"]
558
559
        # Get EVC from circuits buffer
560 1
        circuits = self._get_circuits_buffer()
561
562
        # get the circuit
563 1
        evc = circuits.get(circuit_id)
564
565
        # get the circuit
566 1
        if not evc:
567 1
            result = f"circuit_id {circuit_id} not found"
568 1
            log.debug("create_schedule result %s %s", result, 404)
569 1
            raise HTTPException(404, detail=result)
570
        # Can not modify circuits deleted and archived
571 1
        if evc.archived:
572 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
573 1
            log.debug("create_schedule result %s %s", result, 409)
574 1
            raise HTTPException(409, detail=result)
575
576
        # new schedule from dict
577 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
578
579
        # If there is no schedule, create the list
580 1
        if not evc.circuit_scheduler:
581 1
            evc.circuit_scheduler = []
582
583
        # Add the new schedule
584 1
        evc.circuit_scheduler.append(new_schedule)
585
586
        # Add schedule job
587 1
        self.sched.add_circuit_job(evc, new_schedule)
588
589
        # save circuit to mongodb
590 1
        evc.sync()
591
592 1
        result = new_schedule.as_dict()
593 1
        status = 201
594
595 1
        log.debug("create_schedule result %s %s", result, status)
596 1
        return JSONResponse(result, status_code=status)
597
598 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
599 1
    @validate_openapi(spec)
600 1
    def update_schedule(self, request: Request) -> JSONResponse:
601
        """Update a schedule.
602
603
        Change all attributes from the given schedule from a EVC circuit.
604
        The schedule ID is preserved as default.
605
        Payload example:
606
            {
607
              "date": "2019-08-07T14:52:10.967Z",
608
              "interval": "string",
609
              "frequency": "1 * * *",
610
              "action": "create"
611
            }
612
        """
613 1
        data = get_json_or_400(request, self.controller.loop)
614 1
        schedule_id = request.path_params["schedule_id"]
615 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
616
617
        # Try to find a circuit schedule
618 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
619
620
        # Can not modify circuits deleted and archived
621 1
        if not found_schedule:
622 1
            result = f"schedule_id {schedule_id} not found"
623 1
            log.debug("update_schedule result %s %s", result, 404)
624 1
            raise HTTPException(404, detail=result)
625 1
        if evc.archived:
626 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
627 1
            log.debug("update_schedule result %s %s", result, 409)
628 1
            raise HTTPException(409, detail=result)
629
630 1
        new_schedule = CircuitSchedule.from_dict(data)
631 1
        new_schedule.id = found_schedule.id
632
        # Remove the old schedule
633 1
        evc.circuit_scheduler.remove(found_schedule)
634
        # Append the modified schedule
635 1
        evc.circuit_scheduler.append(new_schedule)
636
637
        # Cancel all schedule jobs
638 1
        self.sched.cancel_job(found_schedule.id)
639
        # Add the new circuit schedule
640 1
        self.sched.add_circuit_job(evc, new_schedule)
641
        # Save EVC to mongodb
642 1
        evc.sync()
643
644 1
        result = new_schedule.as_dict()
645 1
        status = 200
646
647 1
        log.debug("update_schedule result %s %s", result, status)
648 1
        return JSONResponse(result, status_code=status)
649
650 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
651 1
    def delete_schedule(self, request: Request) -> JSONResponse:
652
        """Remove a circuit schedule.
653
654
        Remove the Schedule from EVC.
655
        Remove the Schedule from cron job.
656
        Save the EVC to the Storehouse.
657
        """
658 1
        schedule_id = request.path_params["schedule_id"]
659 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
660 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
661
662
        # Can not modify circuits deleted and archived
663 1
        if not found_schedule:
664 1
            result = f"schedule_id {schedule_id} not found"
665 1
            log.debug("delete_schedule result %s %s", result, 404)
666 1
            raise HTTPException(404, detail=result)
667
668 1
        if evc.archived:
669 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
670 1
            log.debug("delete_schedule result %s %s", result, 409)
671 1
            raise HTTPException(409, detail=result)
672
673
        # Remove the old schedule
674 1
        evc.circuit_scheduler.remove(found_schedule)
675
676
        # Cancel all schedule jobs
677 1
        self.sched.cancel_job(found_schedule.id)
678
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = "Schedule removed"
682 1
        status = 200
683
684 1
        log.debug("delete_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687 1
    def _is_duplicated_evc(self, evc):
688
        """Verify if the circuit given is duplicated with the stored evcs.
689
690
        Args:
691
            evc (EVC): circuit to be analysed.
692
693
        Returns:
694
            boolean: True if the circuit is duplicated, otherwise False.
695
696
        """
697 1
        for circuit in tuple(self.circuits.values()):
698 1
            if not circuit.archived and circuit.shares_uni(evc):
699 1
                return True
700 1
        return False
701
702 1
    @listen_to("kytos/topology.link_up")
703 1
    def on_link_up(self, event):
704
        """Change circuit when link is up or end_maintenance."""
705
        self.handle_link_up(event)
706
707 1
    def handle_link_up(self, event):
708
        """Change circuit when link is up or end_maintenance."""
709 1
        log.info("Event handle_link_up %s", event.content["link"])
710 1
        for evc in self.get_evcs_by_svc_level():
711 1
            if evc.is_enabled() and not evc.archived:
712 1
                with evc.lock:
713 1
                    evc.handle_link_up(event.content["link"])
714
715 1
    @listen_to("kytos/topology.updated")
716 1
    def on_topology_update(self, event):
717
        """Capture topology update event"""
718
        self.handle_topology_update(event)
719
720 1
    def handle_topology_update(self, event):
721
        """Handle topology update"""
722 1
        with self._lock:
723 1
            if (
724
                self._topology_updated_at
725
                and self._topology_updated_at > event.timestamp
726
            ):
727
                return
728 1
            self._topology_updated_at = event.timestamp
729 1
            for evc in self.get_evcs_by_svc_level():
730 1
                if evc.is_enabled() and not evc.archived:
731 1
                    with evc.lock:
732 1
                        evc.handle_topology_update(
733
                            event.content["topology"].switches
734
                        )
735
736 1
    @listen_to("kytos/topology.link_down")
737 1
    def on_link_down(self, event):
738
        """Change circuit when link is down or under_mantenance."""
739
        self.handle_link_down(event)
740
741 1
    def handle_link_down(self, event):
742
        """Change circuit when link is down or under_mantenance."""
743 1
        link = event.content["link"]
744 1
        log.info("Event handle_link_down %s", link)
745 1
        switch_flows = {}
746 1
        evcs_with_failover = []
747 1
        evcs_normal = []
748 1
        check_failover = []
749 1
        for evc in self.get_evcs_by_svc_level():
750 1
            if evc.is_affected_by_link(link):
751
                # if there is no failover path, handles link down the
752
                # tradditional way
753 1
                if (
754
                    not getattr(evc, 'failover_path', None) or
755
                    evc.is_failover_path_affected_by_link(link)
756
                ):
757 1
                    evcs_normal.append(evc)
758 1
                    continue
759 1
                for dpid, flows in evc.get_failover_flows().items():
760 1
                    switch_flows.setdefault(dpid, [])
761 1
                    switch_flows[dpid].extend(flows)
762 1
                evcs_with_failover.append(evc)
763
            else:
764 1
                check_failover.append(evc)
765
766 1
        while switch_flows:
767 1
            offset = settings.BATCH_SIZE or None
768 1
            switches = list(switch_flows.keys())
769 1
            for dpid in switches:
770 1
                emit_event(
771
                    self.controller,
772
                    context="kytos.flow_manager",
773
                    name="flows.install",
774
                    content={
775
                        "dpid": dpid,
776
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
777
                    }
778
                )
779 1
                if offset is None or offset >= len(switch_flows[dpid]):
780 1
                    del switch_flows[dpid]
781 1
                    continue
782 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
783 1
            time.sleep(settings.BATCH_INTERVAL)
784
785 1
        for evc in evcs_with_failover:
786 1
            with evc.lock:
787 1
                old_path = evc.current_path
788 1
                evc.current_path = evc.failover_path
789 1
                evc.failover_path = old_path
790 1
                evc.sync()
791 1
            emit_event(self.controller, "redeployed_link_down",
792
                       content=map_evc_event_content(evc))
793 1
            log.info(
794
                f"{evc} redeployed with failover due to link down {link.id}"
795
            )
796
797 1
        for evc in evcs_normal:
798 1
            emit_event(
799
                self.controller,
800
                "evc_affected_by_link_down",
801
                content={"link_id": link.id} | map_evc_event_content(evc)
802
            )
803
804
        # After handling the hot path, check if new failover paths are needed.
805
        # Note that EVCs affected by link down will generate a KytosEvent for
806
        # deployed|redeployed, which will trigger the failover path setup.
807
        # Thus, we just need to further check the check_failover list
808 1
        for evc in check_failover:
809 1
            if evc.is_failover_path_affected_by_link(link):
810 1
                evc.setup_failover_path()
811
812 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
813 1
    def on_evc_affected_by_link_down(self, event):
814
        """Change circuit when link is down or under_mantenance."""
815
        self.handle_evc_affected_by_link_down(event)
816
817 1
    def handle_evc_affected_by_link_down(self, event):
818
        """Change circuit when link is down or under_mantenance."""
819 1
        evc = self.circuits.get(event.content["evc_id"])
820 1
        link_id = event.content['link_id']
821 1
        if not evc:
822 1
            return
823 1
        with evc.lock:
824 1
            result = evc.handle_link_down()
825 1
        event_name = "error_redeploy_link_down"
826 1
        if result:
827 1
            log.info(f"{evc} redeployed due to link down {link_id}")
828 1
            event_name = "redeployed_link_down"
829 1
        emit_event(self.controller, event_name,
830
                   content=map_evc_event_content(evc))
831
832 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
833 1
    def on_evc_deployed(self, event):
834
        """Handle EVC deployed|redeployed_link_down."""
835
        self.handle_evc_deployed(event)
836
837 1
    def handle_evc_deployed(self, event):
838
        """Setup failover path on evc deployed."""
839 1
        evc = self.circuits.get(event.content["evc_id"])
840 1
        if not evc:
841 1
            return
842 1
        with evc.lock:
843 1
            evc.setup_failover_path()
844
845 1
    @listen_to("kytos/topology.topology_loaded")
846 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
847
        """Load EVCs once the topology is available."""
848
        self.load_all_evcs()
849
850 1
    def load_all_evcs(self):
851
        """Try to load all EVCs on startup."""
852 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
853 1
        for circuit_id, circuit in circuits:
854 1
            if circuit_id not in self.circuits:
855 1
                self._load_evc(circuit)
856
857 1
    def _load_evc(self, circuit_dict):
858
        """Load one EVC from mongodb to memory."""
859 1
        try:
860 1
            evc = self._evc_from_dict(circuit_dict)
861 1
        except ValueError as exception:
862 1
            log.error(
863
                f"Could not load EVC: dict={circuit_dict} error={exception}"
864
            )
865 1
            return None
866
867 1
        if evc.archived:
868 1
            return None
869 1
        evc.deactivate()
870 1
        evc.sync()
871 1
        self.circuits.setdefault(evc.id, evc)
872 1
        self.sched.add(evc)
873 1
        return evc
874
875 1
    @listen_to("kytos/flow_manager.flow.error")
876 1
    def on_flow_mod_error(self, event):
877
        """Handle flow mod errors related to an EVC."""
878
        self.handle_flow_mod_error(event)
879
880 1
    def handle_flow_mod_error(self, event):
881
        """Handle flow mod errors related to an EVC."""
882 1
        flow = event.content["flow"]
883 1
        command = event.content.get("error_command")
884 1
        if command != "add":
885
            return
886 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
887 1
        if evc:
888 1
            evc.remove_current_flows()
889
890 1
    def _evc_dict_with_instances(self, evc_dict):
891
        """Convert some dict values to instance of EVC classes.
892
893
        This method will convert: [UNI, Link]
894
        """
895 1
        data = evc_dict.copy()  # Do not modify the original dict
896 1
        for attribute, value in data.items():
897
            # Get multiple attributes.
898
            # Ex: uni_a, uni_z
899 1
            if "uni" in attribute:
900 1
                try:
901 1
                    data[attribute] = self._uni_from_dict(value)
902 1
                except ValueError as exception:
903 1
                    result = "Error creating UNI: Invalid value"
904 1
                    raise ValueError(result) from exception
905
906 1
            if attribute == "circuit_scheduler":
907 1
                data[attribute] = []
908 1
                for schedule in value:
909 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
910
911
            # Get multiple attributes.
912
            # Ex: primary_links,
913
            #     backup_links,
914
            #     current_links_cache,
915
            #     primary_links_cache,
916
            #     backup_links_cache
917 1
            if "links" in attribute:
918 1
                data[attribute] = [
919
                    self._link_from_dict(link) for link in value
920
                ]
921
922
            # Ex: current_path,
923
            #     primary_path,
924
            #     backup_path
925 1
            if "path" in attribute and attribute != "dynamic_backup_path":
926 1
                data[attribute] = Path(
927
                    [self._link_from_dict(link) for link in value]
928
                )
929
930 1
        return data
931
932 1
    def _evc_from_dict(self, evc_dict):
933 1
        data = self._evc_dict_with_instances(evc_dict)
934 1
        data["table_group"] = self.table_group
935 1
        return EVC(self.controller, **data)
936
937 1
    def _uni_from_dict(self, uni_dict):
938
        """Return a UNI object from python dict."""
939 1
        if uni_dict is None:
940 1
            return False
941
942 1
        interface_id = uni_dict.get("interface_id")
943 1
        interface = self.controller.get_interface_by_id(interface_id)
944 1
        if interface is None:
945 1
            result = (
946
                "Error creating UNI:"
947
                + f"Could not instantiate interface {interface_id}"
948
            )
949 1
            raise ValueError(result) from ValueError
950
951 1
        tag_dict = uni_dict.get("tag", None)
952 1
        if tag_dict:
953 1
            tag = TAG.from_dict(tag_dict)
954
        else:
955 1
            tag = None
956 1
        uni = UNI(interface, tag)
957
958 1
        return uni
959
960 1
    def _link_from_dict(self, link_dict):
961
        """Return a Link object from python dict."""
962 1
        id_a = link_dict.get("endpoint_a").get("id")
963 1
        id_b = link_dict.get("endpoint_b").get("id")
964
965 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
966 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
967 1
        if not endpoint_a:
968 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
969 1
            raise ValueError(error_msg)
970 1
        if not endpoint_b:
971
            error_msg = f"Could not get interface endpoint_b id {id_b}"
972
            raise ValueError(error_msg)
973
974 1
        link = Link(endpoint_a, endpoint_b)
975 1
        if "metadata" in link_dict:
976 1
            link.extend_metadata(link_dict.get("metadata"))
977
978 1
        s_vlan = link.get_metadata("s_vlan")
979 1
        if s_vlan:
980 1
            tag = TAG.from_dict(s_vlan)
981 1
            if tag is False:
982
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
983
                raise ValueError(error_msg)
984 1
            link.update_metadata("s_vlan", tag)
985 1
        return link
986
987 1
    def _find_evc_by_schedule_id(self, schedule_id):
988
        """
989
        Find an EVC and CircuitSchedule based on schedule_id.
990
991
        :param schedule_id: Schedule ID
992
        :return: EVC and Schedule
993
        """
994 1
        circuits = self._get_circuits_buffer()
995 1
        found_schedule = None
996 1
        evc = None
997
998
        # pylint: disable=unused-variable
999 1
        for c_id, circuit in circuits.items():
1000 1
            for schedule in circuit.circuit_scheduler:
1001 1
                if schedule.id == schedule_id:
1002 1
                    found_schedule = schedule
1003 1
                    evc = circuit
1004 1
                    break
1005 1
            if found_schedule:
1006 1
                break
1007 1
        return evc, found_schedule
1008
1009 1
    def _get_circuits_buffer(self):
1010
        """
1011
        Return the circuit buffer.
1012
1013
        If the buffer is empty, try to load data from mongodb.
1014
        """
1015 1
        if not self.circuits:
1016
            # Load circuits from mongodb to buffer
1017 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1018 1
            for c_id, circuit in circuits.items():
1019 1
                evc = self._evc_from_dict(circuit)
1020 1
                self.circuits[c_id] = evc
1021 1
        return self.circuits
1022
1023
    # pylint: disable=attribute-defined-outside-init
1024 1
    @alisten_to("kytos/of_multi_table.enable_table")
1025 1
    async def on_table_enabled(self, event):
1026
        """Handle a recently table enabled."""
1027 1
        table_group = event.content.get("mef_eline", None)
1028 1
        if not table_group:
1029
            return
1030 1
        for group in table_group:
1031 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1032 1
                log.error(f'The table group "{group}" is not allowed for '
1033
                          f'mef_eline. Allowed table groups are '
1034
                          f'{settings.TABLE_GROUP_ALLOWED}')
1035 1
                return
1036 1
        self.table_group.update(table_group)
1037 1
        content = {"group_table": self.table_group}
1038 1
        name = "kytos/mef_eline.enable_table"
1039
        await aemit_event(self.controller, name, content)
1040