Passed
Pull Request — master (#433)
by
unknown
04:15
created

build.main   F

Complexity

Total Complexity 205

Size/Duplication

Total Lines 1131
Duplicated Lines 3.54 %

Test Coverage

Coverage 93.99%

Importance

Changes 0
Metric Value
eloc 752
dl 40
loc 1131
ccs 626
cts 666
cp 0.9399
rs 1.848
c 0
b 0
f 0
wmc 205

50 Methods

Rating   Name   Duplication   Size   Complexity  
A Main._use_uni_tags() 0 10 2
A Main.list_circuits() 0 15 1
A Main.add_metadata() 0 19 3
A Main.on_flow_delete() 0 4 1
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.update_schedule() 0 47 2
A Main.handle_evc_deployed() 0 7 3
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 8 3
A Main.handle_interface_link_up() 0 9 3
A Main.get_metadata() 0 13 2
A Main.handle_link_up() 0 7 5
A Main.delete_metadata() 0 16 2
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 31 2
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.handle_evc_affected_by_link_down() 0 14 4
A Main.on_evc_deployed() 0 4 1
A Main.setup() 0 31 1
F Main.handle_link_down() 0 81 17
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 10 3
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 113 14
B Main.list_schedules() 0 31 5
C Main.should_be_checked() 0 16 9
B Main._link_from_dict() 0 26 6
B Main._uni_from_dict() 0 29 5
A Main.redeploy() 0 23 4
A Main.handle_flow_mod_error() 0 10 4
F Main.update() 0 61 15
C Main.execute_consistency() 0 26 9
A Main.handle_interface_link_down() 0 9 3
B Main.handle_on_interface_link_change() 0 32 8
A Main.handle_flow_delete() 0 7 2
A Main.bulk_add_metadata() 20 20 4
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main.bulk_delete_metadata() 20 20 4
F Main._check_no_tag_duplication() 0 25 14
A Main._load_evc() 0 15 3
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 56 3
A Main._get_circuits_buffer() 0 13 3
A Main.on_interface_link_change() 0 8 1
A Main.delete_circuit() 0 34 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self._lock_circuits = Lock()
70 1
        self.table_group = {"epl": 0, "evpl": 0}
71 1
        self._lock = Lock()
72 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
73
74 1
        self.load_all_evcs()
75 1
        self._topology_updated_at = None
76
77 1
    def get_evcs_by_svc_level(self) -> list:
78
        """Get circuits sorted by desc service level and asc creation_time.
79
80
        In the future, as more ops are offloaded it should be get from the DB.
81
        """
82 1
        with self._lock_circuits:
83 1
            evcs = {i: j for i, j in self.circuits.items() if j.is_enabled()}
84 1
        my_sort = sorted(evcs.values(),
85
                         key=lambda x: (-x.service_level, x.creation_time))
86 1
        return my_sort
87
88 1
    @staticmethod
89 1
    def get_eline_controller():
90
        """Return the ELineController instance."""
91
        return controllers.ELineController()
92
93 1
    def execute(self):
94
        """Execute once when the napp is running."""
95 1
        if self._lock.locked():
96 1
            return
97 1
        log.debug("Starting consistency routine")
98 1
        with self._lock:
99 1
            self.execute_consistency()
100 1
        log.debug("Finished consistency routine")
101
102 1
    def should_be_checked(self, circuit):
103
        "Verify if the circuit meets the necessary conditions to be checked"
104
        # pylint: disable=too-many-boolean-expressions
105 1
        if (
106
                circuit.is_enabled()
107
                and not circuit.is_active()
108
                and not circuit.lock.locked()
109
                and not circuit.has_recent_removed_flow()
110
                and not circuit.is_recent_updated()
111
                and circuit.are_unis_active(self.controller.switches)
112
                # if a inter-switch EVC does not have current_path, it does not
113
                # make sense to run sdntrace on it
114
                and (circuit.is_intra_switch() or circuit.current_path)
115
                ):
116 1
            return True
117
        return False
118
119 1
    def execute_consistency(self):
120
        """Execute consistency routine."""
121 1
        circuits_to_check = []
122 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
123 1
        for circuit in self.get_evcs_by_svc_level():
124 1
            stored_circuits.pop(circuit.id, None)
125 1
            if self.should_be_checked(circuit):
126 1
                circuits_to_check.append(circuit)
127 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
128 1
        for circuit in circuits_to_check:
129 1
            is_checked = circuits_checked.get(circuit.id)
130 1
            if is_checked:
131 1
                circuit.execution_rounds = 0
132 1
                log.info(f"{circuit} enabled but inactive - activating")
133 1
                with circuit.lock:
134 1
                    circuit.activate()
135 1
                    circuit.sync()
136
            else:
137 1
                circuit.execution_rounds += 1
138 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
139 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
140 1
                    with circuit.lock:
141 1
                        circuit.deploy()
142 1
        for circuit_id in stored_circuits:
143 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
144 1
            self._load_evc(stored_circuits[circuit_id])
145
146 1
    def shutdown(self):
147
        """Execute when your napp is unloaded.
148
149
        If you have some cleanup procedure, insert it here.
150
        """
151
152 1
    @rest("/v2/evc/", methods=["GET"])
153 1
    def list_circuits(self, request: Request) -> JSONResponse:
154
        """Endpoint to return circuits stored.
155
156
        archive query arg if defined (not null) will be filtered
157
        accordingly, by default only non archived evcs will be listed
158
        """
159 1
        log.debug("list_circuits /v2/evc")
160 1
        args = request.query_params
161 1
        archived = args.get("archived", "false").lower()
162 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
163 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
164
                                                      metadata=args)
165 1
        circuits = circuits['circuits']
166 1
        return JSONResponse(circuits)
167
168 1
    @rest("/v2/evc/schedule", methods=["GET"])
169 1
    def list_schedules(self, _request: Request) -> JSONResponse:
170
        """Endpoint to return all schedules stored for all circuits.
171
172
        Return a JSON with the following template:
173
        [{"schedule_id": <schedule_id>,
174
         "circuit_id": <circuit_id>,
175
         "schedule": <schedule object>}]
176
        """
177 1
        log.debug("list_schedules /v2/evc/schedule")
178 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
179 1
        if not circuits:
180 1
            result = {}
181 1
            status = 200
182 1
            return JSONResponse(result, status_code=status)
183
184 1
        result = []
185 1
        status = 200
186 1
        for circuit in circuits:
187 1
            circuit_scheduler = circuit.get("circuit_scheduler")
188 1
            if circuit_scheduler:
189 1
                for scheduler in circuit_scheduler:
190 1
                    value = {
191
                        "schedule_id": scheduler.get("id"),
192
                        "circuit_id": circuit.get("id"),
193
                        "schedule": scheduler,
194
                    }
195 1
                    result.append(value)
196
197 1
        log.debug("list_schedules result %s %s", result, status)
198 1
        return JSONResponse(result, status_code=status)
199
200 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
201 1
    def get_circuit(self, request: Request) -> JSONResponse:
202
        """Endpoint to return a circuit based on id."""
203 1
        circuit_id = request.path_params["circuit_id"]
204 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
205 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
206 1
        if not circuit:
207 1
            result = f"circuit_id {circuit_id} not found"
208 1
            log.debug("get_circuit result %s %s", result, 404)
209 1
            raise HTTPException(404, detail=result)
210 1
        status = 200
211 1
        log.debug("get_circuit result %s %s", circuit, status)
212 1
        return JSONResponse(circuit, status_code=status)
213
214
    # pylint: disable=too-many-branches, too-many-statements
215 1
    @rest("/v2/evc/", methods=["POST"])
216 1
    @validate_openapi(spec)
217 1
    def create_circuit(self, request: Request) -> JSONResponse:
218
        """Try to create a new circuit.
219
220
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
221
        UNI_Z's requested C-VID are available from the interfaces' pools. This
222
        is checked when creating the UNI object.
223
224
        Then, E-Line NApp requests a primary and a backup path to the
225
        Pathfinder NApp using the attributes primary_links and backup_links
226
        submitted via REST
227
228
        # For each link composing paths in #3:
229
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
230
        #  - Using the S-VID obtained, generate abstract flow entries to be
231
        #    sent to FlowManager
232
233
        Push abstract flow entries to FlowManager and FlowManager pushes
234
        OpenFlow entries to datapaths
235
236
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
237
        creation
238
239
        Finnaly, notify user of the status of its request.
240
        """
241
        # Try to create the circuit object
242 1
        log.debug("create_circuit /v2/evc/")
243 1
        data = get_json_or_400(request, self.controller.loop)
244
245 1
        try:
246 1
            evc = self._evc_from_dict(data)
247 1
        except (ValueError, KytosTagError) as exception:
248 1
            log.debug("create_circuit result %s %s", exception, 400)
249 1
            raise HTTPException(400, detail=str(exception)) from exception
250 1
        try:
251 1
            check_disabled_component(evc.uni_a, evc.uni_z)
252 1
        except DisabledSwitch as exception:
253 1
            log.debug("create_circuit result %s %s", exception, 409)
254 1
            raise HTTPException(
255
                    409,
256
                    detail=f"Path is not valid: {exception}"
257
                ) from exception
258
259 1
        if evc.primary_path:
260 1
            try:
261 1
                evc.primary_path.is_valid(
262
                    evc.uni_a.interface.switch,
263
                    evc.uni_z.interface.switch,
264
                    bool(evc.circuit_scheduler),
265
                )
266 1
            except InvalidPath as exception:
267 1
                raise HTTPException(
268
                    400,
269
                    detail=f"primary_path is not valid: {exception}"
270
                ) from exception
271 1
        if evc.backup_path:
272 1
            try:
273 1
                evc.backup_path.is_valid(
274
                    evc.uni_a.interface.switch,
275
                    evc.uni_z.interface.switch,
276
                    bool(evc.circuit_scheduler),
277
                )
278 1
            except InvalidPath as exception:
279 1
                raise HTTPException(
280
                    400,
281
                    detail=f"backup_path is not valid: {exception}"
282
                ) from exception
283
284 1
        if not evc._tag_lists_equal():
285 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
286 1
            raise HTTPException(400, detail=detail)
287
288 1
        try:
289 1
            evc._validate_has_primary_or_dynamic()
290 1
        except ValueError as exception:
291 1
            raise HTTPException(400, detail=str(exception)) from exception
292
293 1
        try:
294 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
295
        except DuplicatedNoTagUNI as exception:
296
            log.debug("create_circuit result %s %s", exception, 409)
297
            raise HTTPException(409, detail=str(exception)) from exception
298
299 1
        try:
300 1
            self._use_uni_tags(evc)
301 1
        except KytosTagError as exception:
302 1
            raise HTTPException(400, detail=str(exception)) from exception
303
304
        # save circuit
305 1
        try:
306 1
            evc.sync()
307
        except ValidationError as exception:
308
            raise HTTPException(400, detail=str(exception)) from exception
309
310
        # store circuit in dictionary
311 1
        self.circuits[evc.id] = evc
312
313
        # Schedule the circuit deploy
314 1
        self.sched.add(evc)
315
316
        # Circuit has no schedule, deploy now
317 1
        if not evc.circuit_scheduler:
318 1
            with evc.lock:
319 1
                evc.deploy()
320
321
        # Notify users
322 1
        result = {"circuit_id": evc.id}
323 1
        status = 201
324 1
        log.debug("create_circuit result %s %s", result, status)
325 1
        emit_event(self.controller, name="created",
326
                   content=map_evc_event_content(evc))
327 1
        return JSONResponse(result, status_code=status)
328
329 1
    @staticmethod
330 1
    def _use_uni_tags(evc):
331 1
        uni_a = evc.uni_a
332 1
        evc._use_uni_vlan(uni_a)
333 1
        try:
334 1
            uni_z = evc.uni_z
335 1
            evc._use_uni_vlan(uni_z)
336 1
        except KytosTagError as err:
337 1
            evc.make_uni_vlan_available(uni_a)
338 1
            raise err
339
340 1
    @listen_to('kytos/flow_manager.flow.removed')
341 1
    def on_flow_delete(self, event):
342
        """Capture delete messages to keep track when flows got removed."""
343
        self.handle_flow_delete(event)
344
345 1
    def handle_flow_delete(self, event):
346
        """Keep track when the EVC got flows removed by deriving its cookie."""
347 1
        flow = event.content["flow"]
348 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
349 1
        if evc:
350 1
            log.debug("Flow removed in EVC %s", evc.id)
351 1
            evc.set_flow_removed_at()
352
353 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
354 1
    @validate_openapi(spec)
355 1
    def update(self, request: Request) -> JSONResponse:
356
        """Update a circuit based on payload.
357
358
        The EVC attributes (creation_time, active, current_path,
359
        failover_path, _id, archived) can't be updated.
360
        """
361 1
        data = get_json_or_400(request, self.controller.loop)
362 1
        circuit_id = request.path_params["circuit_id"]
363 1
        log.debug("update /v2/evc/%s", circuit_id)
364 1
        try:
365 1
            evc = self.circuits[circuit_id]
366 1
        except KeyError:
367 1
            result = f"circuit_id {circuit_id} not found"
368 1
            log.debug("update result %s %s", result, 404)
369 1
            raise HTTPException(404, detail=result) from KeyError
370
371 1
        try:
372 1
            updated_data = self._evc_dict_with_instances(data)
373 1
            self._check_no_tag_duplication(
374
                circuit_id, updated_data.get("uni_a"),
375
                updated_data.get("uni_z")
376
            )
377 1
            enable, redeploy = evc.update(**updated_data)
378 1
        except (ValueError, KytosTagError, ValidationError) as exception:
379 1
            log.debug("update result %s %s", exception, 400)
380 1
            raise HTTPException(400, detail=str(exception)) from exception
381 1
        except DuplicatedNoTagUNI as exception:
382
            log.debug("update result %s %s", exception, 409)
383
            raise HTTPException(409, detail=str(exception)) from exception
384 1
        except DisabledSwitch as exception:
385 1
            log.debug("update result %s %s", exception, 409)
386 1
            raise HTTPException(
387
                    409,
388
                    detail=f"Path is not valid: {exception}"
389
                ) from exception
390
391 1
        if evc.is_active():
392
            if enable is False:  # disable if active
393
                with evc.lock:
394
                    evc.remove()
395
            elif redeploy is not None:  # redeploy if active
396
                with evc.lock:
397
                    evc.remove()
398
                    evc.deploy()
399
        else:
400 1
            if enable is True:  # enable if inactive
401 1
                with evc.lock:
402 1
                    evc.deploy()
403 1
            elif evc.is_enabled() and redeploy:
404 1
                with evc.lock:
405 1
                    evc.remove()
406 1
                    evc.deploy()
407 1
        result = {evc.id: evc.as_dict()}
408 1
        status = 200
409
410 1
        log.debug("update result %s %s", result, status)
411 1
        emit_event(self.controller, "updated",
412
                   content=map_evc_event_content(evc, **data))
413 1
        return JSONResponse(result, status_code=status)
414
415 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
416 1
    def delete_circuit(self, request: Request) -> JSONResponse:
417
        """Remove a circuit.
418
419
        First, the flows are removed from the switches, and then the EVC is
420
        disabled.
421
        """
422 1
        circuit_id = request.path_params["circuit_id"]
423 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
424 1
        try:
425 1
            evc = self.circuits.pop(circuit_id)
426 1
        except KeyError:
427 1
            result = f"circuit_id {circuit_id} not found"
428 1
            log.debug("delete_circuit result %s %s", result, 404)
429 1
            raise HTTPException(404, detail=result) from KeyError
430
431 1
        log.info("Removing %s", evc)
432 1
        with evc.lock:
433 1
            evc.remove_current_flows()
434 1
            evc.remove_failover_flows(sync=False)
435 1
            evc.deactivate()
436 1
            evc.disable()
437 1
            self.sched.remove(evc)
438 1
            evc.archive()
439 1
            evc.remove_uni_tags()
440 1
            evc.sync()
441 1
        log.info("EVC removed. %s", evc)
442 1
        result = {"response": f"Circuit {circuit_id} removed"}
443 1
        status = 200
444
445 1
        log.debug("delete_circuit result %s %s", result, status)
446 1
        emit_event(self.controller, "deleted",
447
                   content=map_evc_event_content(evc))
448 1
        return JSONResponse(result, status_code=status)
449
450 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
451 1
    def get_metadata(self, request: Request) -> JSONResponse:
452
        """Get metadata from an EVC."""
453 1
        circuit_id = request.path_params["circuit_id"]
454 1
        try:
455 1
            return (
456
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
457
            )
458
        except KeyError as error:
459
            raise HTTPException(
460
                404,
461
                detail=f"circuit_id {circuit_id} not found."
462
            ) from error
463
464 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
465 1
    @validate_openapi(spec)
466 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
467
        """Add metadata to a bulk of EVCs."""
468 1
        data = get_json_or_400(request, self.controller.loop)
469 1
        circuit_ids = data.pop("circuit_ids")
470
471 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
472
473 1
        fail_evcs = []
474 1
        for _id in circuit_ids:
475 1
            try:
476 1
                evc = self.circuits[_id]
477 1
                evc.extend_metadata(data)
478 1
            except KeyError:
479 1
                fail_evcs.append(_id)
480
481 1
        if fail_evcs:
482 1
            raise HTTPException(404, detail=fail_evcs)
483 1
        return JSONResponse("Operation successful", status_code=201)
484
485 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
486 1
    @validate_openapi(spec)
487 1
    def add_metadata(self, request: Request) -> JSONResponse:
488
        """Add metadata to an EVC."""
489 1
        circuit_id = request.path_params["circuit_id"]
490 1
        metadata = get_json_or_400(request, self.controller.loop)
491 1
        if not isinstance(metadata, dict):
492
            raise HTTPException(400, "Invalid metadata value: {metadata}")
493 1
        try:
494 1
            evc = self.circuits[circuit_id]
495 1
        except KeyError as error:
496 1
            raise HTTPException(
497
                404,
498
                detail=f"circuit_id {circuit_id} not found."
499
            ) from error
500
501 1
        evc.extend_metadata(metadata)
502 1
        evc.sync()
503 1
        return JSONResponse("Operation successful", status_code=201)
504
505 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
506 1
    @validate_openapi(spec)
507 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
508
        """Delete metada from a bulk of EVCs"""
509 1
        data = get_json_or_400(request, self.controller.loop)
510 1
        key = request.path_params["key"]
511 1
        circuit_ids = data.pop("circuit_ids")
512 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
513
514 1
        fail_evcs = []
515 1
        for _id in circuit_ids:
516 1
            try:
517 1
                evc = self.circuits[_id]
518 1
                evc.remove_metadata(key)
519 1
            except KeyError:
520 1
                fail_evcs.append(_id)
521
522 1
        if fail_evcs:
523 1
            raise HTTPException(404, detail=fail_evcs)
524 1
        return JSONResponse("Operation successful")
525
526 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
527 1
    def delete_metadata(self, request: Request) -> JSONResponse:
528
        """Delete metadata from an EVC."""
529 1
        circuit_id = request.path_params["circuit_id"]
530 1
        key = request.path_params["key"]
531 1
        try:
532 1
            evc = self.circuits[circuit_id]
533 1
        except KeyError as error:
534 1
            raise HTTPException(
535
                404,
536
                detail=f"circuit_id {circuit_id} not found."
537
            ) from error
538
539 1
        evc.remove_metadata(key)
540 1
        evc.sync()
541 1
        return JSONResponse("Operation successful")
542
543 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
544 1
    def redeploy(self, request: Request) -> JSONResponse:
545
        """Endpoint to force the redeployment of an EVC."""
546 1
        circuit_id = request.path_params["circuit_id"]
547 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
548 1
        try:
549 1
            evc = self.circuits[circuit_id]
550 1
        except KeyError:
551 1
            raise HTTPException(
552
                404,
553
                detail=f"circuit_id {circuit_id} not found"
554
            ) from KeyError
555 1
        if evc.is_enabled():
556 1
            with evc.lock:
557 1
                evc.remove_current_flows()
558 1
                evc.deploy()
559 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
560 1
            status = 202
561
        else:
562 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
563 1
            status = 409
564
565 1
        return JSONResponse(result, status_code=status)
566
567 1
    @rest("/v2/evc/schedule/", methods=["POST"])
568 1
    @validate_openapi(spec)
569 1
    def create_schedule(self, request: Request) -> JSONResponse:
570
        """
571
        Create a new schedule for a given circuit.
572
573
        This service do no check if there are conflicts with another schedule.
574
        Payload example:
575
            {
576
              "circuit_id":"aa:bb:cc",
577
              "schedule": {
578
                "date": "2019-08-07T14:52:10.967Z",
579
                "interval": "string",
580
                "frequency": "1 * * * *",
581
                "action": "create"
582
              }
583
            }
584
        """
585 1
        log.debug("create_schedule /v2/evc/schedule/")
586 1
        data = get_json_or_400(request, self.controller.loop)
587 1
        circuit_id = data["circuit_id"]
588 1
        schedule_data = data["schedule"]
589
590
        # Get EVC from circuits buffer
591 1
        circuits = self._get_circuits_buffer()
592
593
        # get the circuit
594 1
        evc = circuits.get(circuit_id)
595
596
        # get the circuit
597 1
        if not evc:
598 1
            result = f"circuit_id {circuit_id} not found"
599 1
            log.debug("create_schedule result %s %s", result, 404)
600 1
            raise HTTPException(404, detail=result)
601
602
        # new schedule from dict
603 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604
605
        # If there is no schedule, create the list
606 1
        if not evc.circuit_scheduler:
607 1
            evc.circuit_scheduler = []
608
609
        # Add the new schedule
610 1
        evc.circuit_scheduler.append(new_schedule)
611
612
        # Add schedule job
613 1
        self.sched.add_circuit_job(evc, new_schedule)
614
615
        # save circuit to mongodb
616 1
        evc.sync()
617
618 1
        result = new_schedule.as_dict()
619 1
        status = 201
620
621 1
        log.debug("create_schedule result %s %s", result, status)
622 1
        return JSONResponse(result, status_code=status)
623
624 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625 1
    @validate_openapi(spec)
626 1
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629
        Change all attributes from the given schedule from a EVC circuit.
630
        The schedule ID is preserved as default.
631
        Payload example:
632
            {
633
              "date": "2019-08-07T14:52:10.967Z",
634
              "interval": "string",
635
              "frequency": "1 * * *",
636
              "action": "create"
637
            }
638
        """
639 1
        data = get_json_or_400(request, self.controller.loop)
640 1
        schedule_id = request.path_params["schedule_id"]
641 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642
643
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645
646
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648 1
            result = f"schedule_id {schedule_id} not found"
649 1
            log.debug("update_schedule result %s %s", result, 404)
650 1
            raise HTTPException(404, detail=result)
651
652 1
        new_schedule = CircuitSchedule.from_dict(data)
653 1
        new_schedule.id = found_schedule.id
654
        # Remove the old schedule
655 1
        evc.circuit_scheduler.remove(found_schedule)
656
        # Append the modified schedule
657 1
        evc.circuit_scheduler.append(new_schedule)
658
659
        # Cancel all schedule jobs
660 1
        self.sched.cancel_job(found_schedule.id)
661
        # Add the new circuit schedule
662 1
        self.sched.add_circuit_job(evc, new_schedule)
663
        # Save EVC to mongodb
664 1
        evc.sync()
665
666 1
        result = new_schedule.as_dict()
667 1
        status = 200
668
669 1
        log.debug("update_schedule result %s %s", result, status)
670 1
        return JSONResponse(result, status_code=status)
671
672 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
673 1
    def delete_schedule(self, request: Request) -> JSONResponse:
674
        """Remove a circuit schedule.
675
676
        Remove the Schedule from EVC.
677
        Remove the Schedule from cron job.
678
        Save the EVC to the Storehouse.
679
        """
680 1
        schedule_id = request.path_params["schedule_id"]
681 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
682 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
683
684
        # Can not modify circuits deleted and archived
685 1
        if not found_schedule:
686 1
            result = f"schedule_id {schedule_id} not found"
687 1
            log.debug("delete_schedule result %s %s", result, 404)
688 1
            raise HTTPException(404, detail=result)
689
690
        # Remove the old schedule
691 1
        evc.circuit_scheduler.remove(found_schedule)
692
693
        # Cancel all schedule jobs
694 1
        self.sched.cancel_job(found_schedule.id)
695
        # Save EVC to mongodb
696 1
        evc.sync()
697
698 1
        result = "Schedule removed"
699 1
        status = 200
700
701 1
        log.debug("delete_schedule result %s %s", result, status)
702 1
        return JSONResponse(result, status_code=status)
703
704 1
    def _check_no_tag_duplication(
705
        self,
706
        evc_id: str,
707
        uni_a: Optional[UNI] = None,
708
        uni_z: Optional[UNI] = None
709
    ):
710
        """Check if the given EVC has UNIs with no tag and if these are
711
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
712
        Args:
713
            evc (dict): EVC to be analyzed.
714
        """
715
716
        # No UNIs
717 1
        if not (uni_a or uni_z):
718 1
            return
719
720 1
        if (not (uni_a and not uni_a.user_tag) and
721
                not (uni_z and not uni_z.user_tag)):
722 1
            return
723 1
        for circuit in self.circuits.copy().values():
724 1
            if (not circuit.archived and circuit._id != evc_id):
725 1
                if uni_a and uni_a.user_tag is None:
726 1
                    circuit.check_no_tag_duplicate(uni_a)
727 1
                if uni_z and uni_z.user_tag is None:
728 1
                    circuit.check_no_tag_duplicate(uni_z)
729
730 1
    @listen_to("kytos/topology.link_up")
731 1
    def on_link_up(self, event):
732
        """Change circuit when link is up or end_maintenance."""
733
        self.handle_link_up(event)
734
735 1
    def handle_link_up(self, event):
736
        """Change circuit when link is up or end_maintenance."""
737 1
        log.info("Event handle_link_up %s", event.content["link"])
738 1
        for evc in self.get_evcs_by_svc_level():
739 1
            if evc.is_enabled() and not evc.archived:
740 1
                with evc.lock:
741 1
                    evc.handle_link_up(event.content["link"])
742
743
    # Possibly replace this with interruptions?
744 1
    @listen_to(
745
        '.*.switch.interface.(link_up|link_down|created|deleted)'
746
    )
747 1
    def on_interface_link_change(self, event: KytosEvent):
748
        """
749
        Handler for interface link_up and link_down events.
750
        """
751
        self.handle_on_interface_link_change(event)
752
753 1
    def handle_on_interface_link_change(self, event: KytosEvent):
754
        """
755
        Handler to sort interface events {link_(up, down), create, deleted}
756
757
        To avoid multiple database updated (link flap):
758
        Every interface is identfied and processed in parallel.
759
        Once an interface event is received a time is started.
760
        While time is running self._intf_events will be updated.
761
        After time has passed last received event will be processed.
762
        """
763 1
        iface = event.content.get("interface")
764 1
        with self._lock_interfaces[iface.id]:
765 1
            _now = event.timestamp
766
            # Return out of order events
767 1
            if (
768
                iface.id in self._intf_events
769
                and self._intf_events[iface.id]["event"].timestamp > _now
770
            ):
771 1
                return
772 1
            self._intf_events[iface.id].update({"event": event})
773 1
            if "last_acquired" in self._intf_events[iface.id]:
774 1
                return
775 1
            self._intf_events[iface.id].update({"last_acquired": now()})
776 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
777 1
        with self._lock_interfaces[iface.id]:
778 1
            event = self._intf_events[iface.id]["event"]
779 1
            self._intf_events[iface.id].pop('last_acquired', None)
780 1
            _, _, event_type = event.name.rpartition('.')
781 1
            if event_type in ('link_up', 'created'):
782 1
                self.handle_interface_link_up(iface)
783 1
            elif event_type in ('link_down', 'deleted'):
784 1
                self.handle_interface_link_down(iface)
785
786 1
    def handle_interface_link_up(self, interface):
787
        """
788
        Handler for interface link_up events
789
        """
790
        for evc in self.get_evcs_by_svc_level():
791
            with evc.lock:
792
                log.info("Event handle_interface_link_up %s", interface)
793
                evc.handle_interface_link_up(
794
                    interface
795
                )
796
797 1
    def handle_interface_link_down(self, interface):
798
        """
799
        Handler for interface link_down events
800
        """
801
        for evc in self.get_evcs_by_svc_level():
802
            with evc.lock:
803
                log.info("Event handle_interface_link_down %s", interface)
804
                evc.handle_interface_link_down(
805
                    interface
806
                )
807
808 1
    @listen_to("kytos/topology.link_down")
809 1
    def on_link_down(self, event):
810
        """Change circuit when link is down or under_mantenance."""
811
        self.handle_link_down(event)
812
813 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
814
        """Change circuit when link is down or under_mantenance."""
815 1
        link = event.content["link"]
816 1
        log.info("Event handle_link_down %s", link)
817 1
        switch_flows = {}
818 1
        evcs_with_failover = []
819 1
        evcs_normal = []
820 1
        check_failover = []
821 1
        for evc in self.get_evcs_by_svc_level():
822 1
            if evc.is_affected_by_link(link):
823
                # if there is no failover path, handles link down the
824
                # tradditional way
825 1
                if (
826
                    not getattr(evc, 'failover_path', None) or
827
                    evc.is_failover_path_affected_by_link(link)
828
                ):
829 1
                    evcs_normal.append(evc)
830 1
                    continue
831 1
                try:
832 1
                    dpid_flows = evc.get_failover_flows()
833
                # pylint: disable=broad-except
834 1
                except Exception:
835 1
                    err = traceback.format_exc().replace("\n", ", ")
836 1
                    log.error(
837
                        f"Ignore Failover path for {evc} due to error: {err}"
838
                    )
839 1
                    evcs_normal.append(evc)
840 1
                    continue
841 1
                for dpid, flows in dpid_flows.items():
842 1
                    switch_flows.setdefault(dpid, [])
843 1
                    switch_flows[dpid].extend(flows)
844 1
                evcs_with_failover.append(evc)
845
            else:
846 1
                check_failover.append(evc)
847
848 1
        while switch_flows:
849 1
            offset = settings.BATCH_SIZE or None
850 1
            switches = list(switch_flows.keys())
851 1
            for dpid in switches:
852 1
                emit_event(
853
                    self.controller,
854
                    context="kytos.flow_manager",
855
                    name="flows.install",
856
                    content={
857
                        "dpid": dpid,
858
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
859
                    }
860
                )
861 1
                if offset is None or offset >= len(switch_flows[dpid]):
862 1
                    del switch_flows[dpid]
863 1
                    continue
864 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
865 1
            time.sleep(settings.BATCH_INTERVAL)
866
867 1
        for evc in evcs_with_failover:
868 1
            with evc.lock:
869 1
                old_path = evc.current_path
870 1
                evc.current_path = evc.failover_path
871 1
                evc.failover_path = old_path
872 1
                evc.sync()
873 1
            emit_event(self.controller, "redeployed_link_down",
874
                       content=map_evc_event_content(evc))
875 1
            log.info(
876
                f"{evc} redeployed with failover due to link down {link.id}"
877
            )
878
879 1
        for evc in evcs_normal:
880 1
            emit_event(
881
                self.controller,
882
                "evc_affected_by_link_down",
883
                content={"link_id": link.id} | map_evc_event_content(evc)
884
            )
885
886
        # After handling the hot path, check if new failover paths are needed.
887
        # Note that EVCs affected by link down will generate a KytosEvent for
888
        # deployed|redeployed, which will trigger the failover path setup.
889
        # Thus, we just need to further check the check_failover list
890 1
        for evc in check_failover:
891 1
            if evc.is_failover_path_affected_by_link(link):
892 1
                with evc.lock:
893 1
                    evc.setup_failover_path()
894
895 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
896 1
    def on_evc_affected_by_link_down(self, event):
897
        """Change circuit when link is down or under_mantenance."""
898
        self.handle_evc_affected_by_link_down(event)
899
900 1
    def handle_evc_affected_by_link_down(self, event):
901
        """Change circuit when link is down or under_mantenance."""
902 1
        evc = self.circuits.get(event.content["evc_id"])
903 1
        link_id = event.content['link_id']
904 1
        if not evc:
905 1
            return
906 1
        with evc.lock:
907 1
            result = evc.handle_link_down()
908 1
        event_name = "error_redeploy_link_down"
909 1
        if result:
910 1
            log.info(f"{evc} redeployed due to link down {link_id}")
911 1
            event_name = "redeployed_link_down"
912 1
        emit_event(self.controller, event_name,
913
                   content=map_evc_event_content(evc))
914
915 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
916 1
    def on_evc_deployed(self, event):
917
        """Handle EVC deployed|redeployed_link_down."""
918
        self.handle_evc_deployed(event)
919
920 1
    def handle_evc_deployed(self, event):
921
        """Setup failover path on evc deployed."""
922 1
        evc = self.circuits.get(event.content["evc_id"])
923 1
        if not evc:
924 1
            return
925 1
        with evc.lock:
926 1
            evc.setup_failover_path()
927
928 1
    @listen_to("kytos/topology.topology_loaded")
929 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
930
        """Load EVCs once the topology is available."""
931
        self.load_all_evcs()
932
933 1
    def load_all_evcs(self):
934
        """Try to load all EVCs on startup."""
935 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
936 1
        for circuit_id, circuit in circuits:
937 1
            if circuit_id not in self.circuits:
938 1
                self._load_evc(circuit)
939 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
940
                   timeout=1)
941
942 1
    def _load_evc(self, circuit_dict):
943
        """Load one EVC from mongodb to memory."""
944 1
        try:
945 1
            evc = self._evc_from_dict(circuit_dict)
946 1
        except (ValueError, KytosTagError) as exception:
947 1
            log.error(
948
                f"Could not load EVC: dict={circuit_dict} error={exception}"
949
            )
950 1
            return None
951 1
        if evc.archived:
952 1
            return None
953
954 1
        self.circuits.setdefault(evc.id, evc)
955 1
        self.sched.add(evc)
956 1
        return evc
957
958 1
    @listen_to("kytos/flow_manager.flow.error")
959 1
    def on_flow_mod_error(self, event):
960
        """Handle flow mod errors related to an EVC."""
961
        self.handle_flow_mod_error(event)
962
963 1
    def handle_flow_mod_error(self, event):
964
        """Handle flow mod errors related to an EVC."""
965 1
        flow = event.content["flow"]
966 1
        command = event.content.get("error_command")
967 1
        if command != "add":
968
            return
969 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
970 1
        if evc:
971 1
            with evc.lock:
972 1
                evc.remove_current_flows()
973
974 1
    def _evc_dict_with_instances(self, evc_dict):
975
        """Convert some dict values to instance of EVC classes.
976
977
        This method will convert: [UNI, Link]
978
        """
979 1
        data = evc_dict.copy()  # Do not modify the original dict
980 1
        for attribute, value in data.items():
981
            # Get multiple attributes.
982
            # Ex: uni_a, uni_z
983 1
            if "uni" in attribute:
984 1
                try:
985 1
                    data[attribute] = self._uni_from_dict(value)
986 1
                except ValueError as exception:
987 1
                    result = "Error creating UNI: Invalid value"
988 1
                    raise ValueError(result) from exception
989
990 1
            if attribute == "circuit_scheduler":
991 1
                data[attribute] = []
992 1
                for schedule in value:
993 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
994
995
            # Get multiple attributes.
996
            # Ex: primary_links,
997
            #     backup_links,
998
            #     current_links_cache,
999
            #     primary_links_cache,
1000
            #     backup_links_cache
1001 1
            if "links" in attribute:
1002 1
                data[attribute] = [
1003
                    self._link_from_dict(link) for link in value
1004
                ]
1005
1006
            # Ex: current_path,
1007
            #     primary_path,
1008
            #     backup_path
1009 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1010 1
                data[attribute] = Path(
1011
                    [self._link_from_dict(link) for link in value]
1012
                )
1013
1014 1
        return data
1015
1016 1
    def _evc_from_dict(self, evc_dict):
1017 1
        data = self._evc_dict_with_instances(evc_dict)
1018 1
        data["table_group"] = self.table_group
1019 1
        return EVC(self.controller, **data)
1020
1021 1
    def _uni_from_dict(self, uni_dict):
1022
        """Return a UNI object from python dict."""
1023 1
        if uni_dict is None:
1024 1
            return False
1025
1026 1
        interface_id = uni_dict.get("interface_id")
1027 1
        interface = self.controller.get_interface_by_id(interface_id)
1028 1
        if interface is None:
1029 1
            result = (
1030
                "Error creating UNI:"
1031
                + f"Could not instantiate interface {interface_id}"
1032
            )
1033 1
            raise ValueError(result) from ValueError
1034 1
        tag_convert = {1: "vlan"}
1035 1
        tag_dict = uni_dict.get("tag", None)
1036 1
        if tag_dict:
1037 1
            tag_type = tag_dict.get("tag_type")
1038 1
            tag_type = tag_convert.get(tag_type, tag_type)
1039 1
            tag_value = tag_dict.get("value")
1040 1
            if isinstance(tag_value, list):
1041 1
                tag_value = get_tag_ranges(tag_value)
1042 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1043 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1044
            else:
1045 1
                tag = TAG(tag_type, tag_value)
1046
        else:
1047 1
            tag = None
1048 1
        uni = UNI(interface, tag)
1049 1
        return uni
1050
1051 1
    def _link_from_dict(self, link_dict):
1052
        """Return a Link object from python dict."""
1053 1
        id_a = link_dict.get("endpoint_a").get("id")
1054 1
        id_b = link_dict.get("endpoint_b").get("id")
1055
1056 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1057 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1058 1
        if not endpoint_a:
1059 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1060 1
            raise ValueError(error_msg)
1061 1
        if not endpoint_b:
1062
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1063
            raise ValueError(error_msg)
1064
1065 1
        link = Link(endpoint_a, endpoint_b)
1066 1
        if "metadata" in link_dict:
1067 1
            link.extend_metadata(link_dict.get("metadata"))
1068
1069 1
        s_vlan = link.get_metadata("s_vlan")
1070 1
        if s_vlan:
1071 1
            tag = TAG.from_dict(s_vlan)
1072 1
            if tag is False:
1073
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1074
                raise ValueError(error_msg)
1075 1
            link.update_metadata("s_vlan", tag)
1076 1
        return link
1077
1078 1
    def _find_evc_by_schedule_id(self, schedule_id):
1079
        """
1080
        Find an EVC and CircuitSchedule based on schedule_id.
1081
1082
        :param schedule_id: Schedule ID
1083
        :return: EVC and Schedule
1084
        """
1085 1
        circuits = self._get_circuits_buffer()
1086 1
        found_schedule = None
1087 1
        evc = None
1088
1089
        # pylint: disable=unused-variable
1090 1
        for c_id, circuit in circuits.items():
1091 1
            for schedule in circuit.circuit_scheduler:
1092 1
                if schedule.id == schedule_id:
1093 1
                    found_schedule = schedule
1094 1
                    evc = circuit
1095 1
                    break
1096 1
            if found_schedule:
1097 1
                break
1098 1
        return evc, found_schedule
1099
1100 1
    def _get_circuits_buffer(self):
1101
        """
1102
        Return the circuit buffer.
1103
1104
        If the buffer is empty, try to load data from mongodb.
1105
        """
1106 1
        if not self.circuits:
1107
            # Load circuits from mongodb to buffer
1108 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1109 1
            for c_id, circuit in circuits.items():
1110 1
                evc = self._evc_from_dict(circuit)
1111 1
                self.circuits[c_id] = evc
1112 1
        return self.circuits
1113
1114
    # pylint: disable=attribute-defined-outside-init
1115 1
    @alisten_to("kytos/of_multi_table.enable_table")
1116 1
    async def on_table_enabled(self, event):
1117
        """Handle a recently table enabled."""
1118 1
        table_group = event.content.get("mef_eline", None)
1119 1
        if not table_group:
1120 1
            return
1121 1
        for group in table_group:
1122 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1123 1
                log.error(f'The table group "{group}" is not allowed for '
1124
                          f'mef_eline. Allowed table groups are '
1125
                          f'{settings.TABLE_GROUP_ALLOWED}')
1126 1
                return
1127 1
        self.table_group.update(table_group)
1128 1
        content = {"group_table": self.table_group}
1129 1
        name = "kytos/mef_eline.enable_table"
1130
        await aemit_event(self.controller, name, content)
1131