Test Failed
Pull Request — master (#396)
by Vinicius
10:08 queued 07:58
created

build.main.Main.update()   F

Complexity

Conditions 15

Size

Total Lines 65
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 16.1832

Importance

Changes 0
Metric Value
cc 15
eloc 53
nop 2
dl 0
loc 65
ccs 38
cts 46
cp 0.8261
crap 16.1832
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17 1
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21 1
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26 1
                                          Path)
27
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31 1
32
33
# pylint: disable=too-many-public-methods
34
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37 1
    This class is the entry point for this napp.
38
    """
39 1
40
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48 1
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52 1
53
        # object to save and load circuits
54
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58
        DynamicPathManager.set_controller(self.controller)
59 1
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61 1
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63 1
64
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69
        self._topology_updated_at = None
70
71
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73 1
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77 1
                      key=lambda x: (-x.service_level, x.creation_time))
78
79
    @staticmethod
80
    def get_eline_controller():
81 1
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83 1
84 1
    def execute(self):
85 1
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89
        with self._lock:
90 1
            self.execute_consistency()
91
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104 1
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109 1
110 1
    def execute_consistency(self):
111 1
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127 1
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135
            self._load_evc(stored_circuits[circuit_id])
136
137
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140 1
        If you have some cleanup procedure, insert it here.
141 1
        """
142
143
    @rest("/v2/evc/", methods=["GET"])
144
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147 1
        archive query arg if defined (not null) will be filtered
148 1
        accordingly, by default only non archived evcs will be listed
149 1
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159
    @rest("/v2/evc/schedule", methods=["GET"])
160
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165 1
         "circuit_id": <circuit_id>,
166 1
         "schedule": <schedule object>}]
167 1
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174 1
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179
            if circuit_scheduler:
180
                for scheduler in circuit_scheduler:
181
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183 1
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185 1
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193 1
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204 1
205
    # pylint: disable=too-many-branches, too-many-statements
206
    @rest("/v2/evc/", methods=["POST"])
207
    @validate_openapi(spec)
208
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229 1
230 1
        Finnaly, notify user of the status of its request.
231
        """
232 1
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235 1
236 1
        try:
237
            evc = self._evc_from_dict(data)
238 1
        except ValueError as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        except KytosTagError as exception:
242 1
            log.debug("create_circuit result %s %s", exception, 400)
243
            raise HTTPException(400, detail=str(exception)) from exception
244
        try:
245
            check_disabled_component(evc.uni_a, evc.uni_z)
246
        except DisabledSwitch as exception:
247 1
            log.debug("create_circuit result %s %s", exception, 409)
248
            raise HTTPException(
249
                    409,
250
                    detail=f"Path is not valid: {exception}"
251
                ) from exception
252
253
        if evc.primary_path:
254
            try:
255
                evc.primary_path.is_valid(
256
                    evc.uni_a.interface.switch,
257
                    evc.uni_z.interface.switch,
258
                    bool(evc.circuit_scheduler),
259 1
                )
260
            except InvalidPath as exception:
261
                raise HTTPException(
262
                    400,
263
                    detail=f"primary_path is not valid: {exception}"
264
                ) from exception
265
        if evc.backup_path:
266
            try:
267
                evc.backup_path.is_valid(
268
                    evc.uni_a.interface.switch,
269
                    evc.uni_z.interface.switch,
270
                    bool(evc.circuit_scheduler),
271
                )
272
            except InvalidPath as exception:
273 1
                raise HTTPException(
274 1
                    400,
275 1
                    detail=f"backup_path is not valid: {exception}"
276 1
                ) from exception
277
278 1
        if self._is_duplicated_evc(evc):
279 1
            result = "The EVC already exists."
280 1
            log.debug("create_circuit result %s %s", result, 409)
281 1
            raise HTTPException(409, detail=result)
282
283 1
        if not evc._tag_lists_equal():
284 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
285
            raise HTTPException(400, detail=detail)
286
287
        try:
288
            evc._validate_has_primary_or_dynamic()
289 1
        except ValueError as exception:
290 1
            raise HTTPException(400, detail=str(exception)) from exception
291
292
        try:
293
            self._use_uni_tags(evc)
294
        except KytosTagError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297
        # save circuit
298 1
        try:
299
            evc.sync()
300
        except ValidationError as exception:
301 1
            raise HTTPException(400, detail=str(exception)) from exception
302 1
303 1
        # store circuit in dictionary
304
        self.circuits[evc.id] = evc
305
306 1
        # Schedule the circuit deploy
307 1
        self.sched.add(evc)
308 1
309 1
        # Circuit has no schedule, deploy now
310
        if not evc.circuit_scheduler:
311 1
            with evc.lock:
312
                evc.deploy()
313 1
314 1
        # Notify users
315 1
        result = {"circuit_id": evc.id}
316 1
        status = 201
317 1
        log.debug("create_circuit result %s %s", result, status)
318 1
        emit_event(self.controller, name="created",
319 1
                   content=map_evc_event_content(evc))
320 1
        return JSONResponse(result, status_code=status)
321 1
322 1
    @staticmethod
323 1
    def _use_uni_tags(evc):
324 1
        uni_a = evc.uni_a
325 1
        evc._use_uni_vlan(uni_a)
326
        try:
327 1
            uni_z = evc.uni_z
328 1
            evc._use_uni_vlan(uni_z)
329
        except KytosTagError as err:
330
            evc.make_uni_vlan_available(uni_a)
331
            raise err
332 1
333
    @listen_to('kytos/flow_manager.flow.removed')
334 1
    def on_flow_delete(self, event):
335 1
        """Capture delete messages to keep track when flows got removed."""
336 1
        self.handle_flow_delete(event)
337 1
338 1
    def handle_flow_delete(self, event):
339
        """Keep track when the EVC got flows removed by deriving its cookie."""
340 1
        flow = event.content["flow"]
341 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
342 1
        if evc:
343
            log.debug("Flow removed in EVC %s", evc.id)
344
            evc.set_flow_removed_at()
345
346
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
347
    @validate_openapi(spec)
348 1
    def update(self, request: Request) -> JSONResponse:
349 1
        """Update a circuit based on payload.
350 1
351 1
        The EVC attributes (creation_time, active, current_path,
352 1
        failover_path, _id, archived) can't be updated.
353 1
        """
354 1
        data = get_json_or_400(request, self.controller.loop)
355 1
        circuit_id = request.path_params["circuit_id"]
356 1
        log.debug("update /v2/evc/%s", circuit_id)
357
        try:
358 1
            evc = self.circuits[circuit_id]
359 1
        except KeyError:
360 1
            result = f"circuit_id {circuit_id} not found"
361 1
            log.debug("update result %s %s", result, 404)
362
            raise HTTPException(404, detail=result) from KeyError
363 1
364 1
        if evc.archived:
365
            result = "Can't update archived EVC"
366
            log.debug("update result %s %s", result, 409)
367 1
            raise HTTPException(409, detail=result)
368
369 1
        try:
370 1
            enable, redeploy = evc.update(
371 1
                **self._evc_dict_with_instances(data)
372 1
            )
373 1
        except KytosTagError as exception:
374 1
            raise HTTPException(400, detail=str(exception)) from exception
375 1
        except ValidationError as exception:
376
            raise HTTPException(400, detail=str(exception)) from exception
377
        except ValueError as exception:
378
            log.debug("update result %s %s", exception, 400)
379
            raise HTTPException(400, detail=str(exception)) from exception
380 1
        except DisabledSwitch as exception:
381
            log.debug("update result %s %s", exception, 409)
382
            raise HTTPException(
383
                    409,
384
                    detail=f"Path is not valid: {exception}"
385
                ) from exception
386
387
        if self._is_duplicated_evc(evc):
388
            result = "The EVC already exists."
389 1
            log.debug("create_circuit result %s %s", result, 409)
390 1
            raise HTTPException(409, detail=result)
391 1
392 1
        if evc.is_active():
393 1
            if enable is False:  # disable if active
394
                with evc.lock:
395 1
                    evc.remove()
396 1
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398 1
                    evc.remove()
399
                    evc.deploy()
400 1
        else:
401 1
            if enable is True:  # enable if inactive
402
                with evc.lock:
403
                    evc.deploy()
404
        result = {evc.id: evc.as_dict()}
405
        status = 200
406
407 1
        log.debug("update result %s %s", result, status)
408 1
        emit_event(self.controller, "updated",
409 1
                   content=map_evc_event_content(evc, **data))
410 1
        return JSONResponse(result, status_code=status)
411 1
412 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
413 1
    def delete_circuit(self, request: Request) -> JSONResponse:
414 1
        """Remove a circuit.
415
416 1
        First, the flows are removed from the switches, and then the EVC is
417 1
        disabled.
418 1
        """
419 1
        circuit_id = request.path_params["circuit_id"]
420
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
421 1
        try:
422 1
            evc = self.circuits[circuit_id]
423 1
        except KeyError:
424 1
            result = f"circuit_id {circuit_id} not found"
425 1
            log.debug("delete_circuit result %s %s", result, 404)
426 1
            raise HTTPException(404, detail=result) from KeyError
427 1
428 1
        if evc.archived:
429 1
            result = f"Circuit {circuit_id} already removed"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result)
432 1
433 1
        log.info("Removing %s", evc)
434
        with evc.lock:
435 1
            evc.remove_current_flows()
436 1
            evc.remove_failover_flows(sync=False)
437
            evc.deactivate()
438 1
            evc.disable()
439
            self.sched.remove(evc)
440 1
            evc.archive()
441 1
            evc.remove_uni_tags()
442
            evc.sync()
443 1
        log.info("EVC removed. %s", evc)
444 1
        result = {"response": f"Circuit {circuit_id} removed"}
445 1
        status = 200
446
447
        log.debug("delete_circuit result %s %s", result, status)
448
        emit_event(self.controller, "deleted",
449
                   content=map_evc_event_content(evc))
450
        return JSONResponse(result, status_code=status)
451
452
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
453
    def get_metadata(self, request: Request) -> JSONResponse:
454 1
        """Get metadata from an EVC."""
455 1
        circuit_id = request.path_params["circuit_id"]
456 1
        try:
457
            return (
458 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
459 1
            )
460
        except KeyError as error:
461 1
            raise HTTPException(
462
                404,
463 1
                detail=f"circuit_id {circuit_id} not found."
464 1
            ) from error
465 1
466 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
467 1
    @validate_openapi(spec)
468 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
469 1
        """Add metadata to a bulk of EVCs."""
470
        data = get_json_or_400(request, self.controller.loop)
471 1
        circuit_ids = data.pop("circuit_ids")
472 1
473 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
474
475 1
        fail_evcs = []
476 1
        for _id in circuit_ids:
477 1
            try:
478
                evc = self.circuits[_id]
479 1
                evc.extend_metadata(data)
480 1
            except KeyError:
481 1
                fail_evcs.append(_id)
482
483 1
        if fail_evcs:
484 1
            raise HTTPException(404, detail=fail_evcs)
485 1
        return JSONResponse("Operation successful", status_code=201)
486 1
487
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
488
    @validate_openapi(spec)
489
    def add_metadata(self, request: Request) -> JSONResponse:
490
        """Add metadata to an EVC."""
491 1
        circuit_id = request.path_params["circuit_id"]
492 1
        metadata = get_json_or_400(request, self.controller.loop)
493 1
        if not isinstance(metadata, dict):
494
            raise HTTPException(400, "Invalid metadata value: {metadata}")
495 1
        try:
496 1
            evc = self.circuits[circuit_id]
497 1
        except KeyError as error:
498
            raise HTTPException(
499 1
                404,
500 1
                detail=f"circuit_id {circuit_id} not found."
501 1
            ) from error
502 1
503
        evc.extend_metadata(metadata)
504 1
        evc.sync()
505 1
        return JSONResponse("Operation successful", status_code=201)
506 1
507 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508 1
    @validate_openapi(spec)
509
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
510
        """Delete metada from a bulk of EVCs"""
511
        data = get_json_or_400(request, self.controller.loop)
512 1
        key = request.path_params["key"]
513
        circuit_ids = data.pop("circuit_ids")
514 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
515
516 1
        fail_evcs = []
517 1
        for _id in circuit_ids:
518
            try:
519 1
                evc = self.circuits[_id]
520 1
                evc.remove_metadata(key)
521 1
            except KeyError:
522 1
                fail_evcs.append(_id)
523 1
524 1
        if fail_evcs:
525
            raise HTTPException(404, detail=fail_evcs)
526
        return JSONResponse("Operation successful")
527
528
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
529 1
    def delete_metadata(self, request: Request) -> JSONResponse:
530 1
        """Delete metadata from an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532
        key = request.path_params["key"]
533 1
        try:
534 1
            evc = self.circuits[circuit_id]
535
        except KeyError as error:
536 1
            raise HTTPException(
537 1
                404,
538 1
                detail=f"circuit_id {circuit_id} not found."
539 1
            ) from error
540 1
541 1
        evc.remove_metadata(key)
542
        evc.sync()
543
        return JSONResponse("Operation successful")
544
545 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
546 1
    def redeploy(self, request: Request) -> JSONResponse:
547 1
        """Endpoint to force the redeployment of an EVC."""
548 1
        circuit_id = request.path_params["circuit_id"]
549 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
550 1
        try:
551
            evc = self.circuits[circuit_id]
552 1
        except KeyError:
553 1
            raise HTTPException(
554
                404,
555 1
                detail=f"circuit_id {circuit_id} not found"
556
            ) from KeyError
557 1
        if evc.is_enabled():
558 1
            with evc.lock:
559 1
                evc.remove_current_flows()
560
                evc.deploy()
561
            result = {"response": f"Circuit {circuit_id} redeploy received."}
562
            status = 202
563
        else:
564
            result = {"response": f"Circuit {circuit_id} is disabled."}
565
            status = 409
566
567
        return JSONResponse(result, status_code=status)
568
569
    @rest("/v2/evc/schedule/", methods=["POST"])
570
    @validate_openapi(spec)
571
    def create_schedule(self, request: Request) -> JSONResponse:
572
        """
573
        Create a new schedule for a given circuit.
574
575 1
        This service do no check if there are conflicts with another schedule.
576 1
        Payload example:
577 1
            {
578 1
              "circuit_id":"aa:bb:cc",
579
              "schedule": {
580
                "date": "2019-08-07T14:52:10.967Z",
581 1
                "interval": "string",
582
                "frequency": "1 * * * *",
583
                "action": "create"
584 1
              }
585
            }
586
        """
587 1
        log.debug("create_schedule /v2/evc/schedule/")
588 1
        data = get_json_or_400(request, self.controller.loop)
589 1
        circuit_id = data["circuit_id"]
590 1
        schedule_data = data["schedule"]
591
592 1
        # Get EVC from circuits buffer
593 1
        circuits = self._get_circuits_buffer()
594 1
595 1
        # get the circuit
596
        evc = circuits.get(circuit_id)
597
598 1
        # get the circuit
599
        if not evc:
600
            result = f"circuit_id {circuit_id} not found"
601 1
            log.debug("create_schedule result %s %s", result, 404)
602 1
            raise HTTPException(404, detail=result)
603
        # Can not modify circuits deleted and archived
604
        if evc.archived:
605 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
606
            log.debug("create_schedule result %s %s", result, 409)
607
            raise HTTPException(409, detail=result)
608 1
609
        # new schedule from dict
610
        new_schedule = CircuitSchedule.from_dict(schedule_data)
611 1
612
        # If there is no schedule, create the list
613 1
        if not evc.circuit_scheduler:
614 1
            evc.circuit_scheduler = []
615
616 1
        # Add the new schedule
617 1
        evc.circuit_scheduler.append(new_schedule)
618
619 1
        # Add schedule job
620 1
        self.sched.add_circuit_job(evc, new_schedule)
621 1
622
        # save circuit to mongodb
623
        evc.sync()
624
625
        result = new_schedule.as_dict()
626
        status = 201
627
628
        log.debug("create_schedule result %s %s", result, status)
629
        return JSONResponse(result, status_code=status)
630
631
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
632
    @validate_openapi(spec)
633
    def update_schedule(self, request: Request) -> JSONResponse:
634 1
        """Update a schedule.
635 1
636 1
        Change all attributes from the given schedule from a EVC circuit.
637
        The schedule ID is preserved as default.
638
        Payload example:
639 1
            {
640
              "date": "2019-08-07T14:52:10.967Z",
641
              "interval": "string",
642 1
              "frequency": "1 * * *",
643 1
              "action": "create"
644 1
            }
645 1
        """
646 1
        data = get_json_or_400(request, self.controller.loop)
647 1
        schedule_id = request.path_params["schedule_id"]
648 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
649 1
650
        # Try to find a circuit schedule
651 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
652 1
653
        # Can not modify circuits deleted and archived
654 1
        if not found_schedule:
655
            result = f"schedule_id {schedule_id} not found"
656 1
            log.debug("update_schedule result %s %s", result, 404)
657
            raise HTTPException(404, detail=result)
658
        if evc.archived:
659 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
660
            log.debug("update_schedule result %s %s", result, 409)
661 1
            raise HTTPException(409, detail=result)
662
663 1
        new_schedule = CircuitSchedule.from_dict(data)
664
        new_schedule.id = found_schedule.id
665 1
        # Remove the old schedule
666 1
        evc.circuit_scheduler.remove(found_schedule)
667
        # Append the modified schedule
668 1
        evc.circuit_scheduler.append(new_schedule)
669 1
670
        # Cancel all schedule jobs
671 1
        self.sched.cancel_job(found_schedule.id)
672 1
        # Add the new circuit schedule
673
        self.sched.add_circuit_job(evc, new_schedule)
674
        # Save EVC to mongodb
675
        evc.sync()
676
677
        result = new_schedule.as_dict()
678
        status = 200
679 1
680 1
        log.debug("update_schedule result %s %s", result, status)
681 1
        return JSONResponse(result, status_code=status)
682
683
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
684 1
    def delete_schedule(self, request: Request) -> JSONResponse:
685 1
        """Remove a circuit schedule.
686 1
687 1
        Remove the Schedule from EVC.
688
        Remove the Schedule from cron job.
689 1
        Save the EVC to the Storehouse.
690 1
        """
691 1
        schedule_id = request.path_params["schedule_id"]
692 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
693
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
694
695 1
        # Can not modify circuits deleted and archived
696
        if not found_schedule:
697
            result = f"schedule_id {schedule_id} not found"
698 1
            log.debug("delete_schedule result %s %s", result, 404)
699
            raise HTTPException(404, detail=result)
700 1
701
        if evc.archived:
702 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
703 1
            log.debug("delete_schedule result %s %s", result, 409)
704
            raise HTTPException(409, detail=result)
705 1
706 1
        # Remove the old schedule
707
        evc.circuit_scheduler.remove(found_schedule)
708 1
709
        # Cancel all schedule jobs
710
        self.sched.cancel_job(found_schedule.id)
711
        # Save EVC to mongodb
712
        evc.sync()
713
714
        result = "Schedule removed"
715
        status = 200
716
717
        log.debug("delete_schedule result %s %s", result, status)
718 1
        return JSONResponse(result, status_code=status)
719 1
720 1
    def _is_duplicated_evc(self, evc):
721 1
        """Verify if the circuit given is duplicated with the stored evcs.
722
723 1
        Args:
724 1
            evc (EVC): circuit to be analysed.
725
726
        Returns:
727
            boolean: True if the circuit is duplicated, otherwise False.
728 1
729
        """
730 1
        for circuit in tuple(self.circuits.values()):
731 1
            if (not circuit.archived and circuit._id != evc._id
732 1
                    and circuit.shares_uni(evc)):
733 1
                return True
734 1
        return False
735
736
    @listen_to("kytos/topology.link_up")
737 1
    def on_link_up(self, event):
738
        """Change circuit when link is up or end_maintenance."""
739
        self.handle_link_up(event)
740
741 1
    def handle_link_up(self, event):
742
        """Change circuit when link is up or end_maintenance."""
743
        log.info("Event handle_link_up %s", event.content["link"])
744
        for evc in self.get_evcs_by_svc_level():
745
            if evc.is_enabled() and not evc.archived:
746
                with evc.lock:
747
                    evc.handle_link_up(event.content["link"])
748
749
    # Possibly replace this with interruptions?
750
    @listen_to(
751
        '.*.switch.interface.(link_up|link_down|created|deleted)',
752
        pool='dynamic_single'
753 1
    )
754
    def on_interface_link_change(self, event: KytosEvent):
755
        """
756
        Handler for interface link_up and link_down events
757
        """
758
        with self._lock:
759
            _, _, event_type = event.name.rpartition('.')
760
            iface = event.content.get("interface")
761
            if event_type in ('link_up', 'created'):
762
                self.handle_interface_link_up(iface)
763 1
            elif event_type in ('link_down', 'deleted'):
764
                self.handle_interface_link_down(iface)
765
766
    def handle_interface_link_up(self, interface):
767
        """
768
        Handler for interface link_up events
769
        """
770
        for evc in self.get_evcs_by_svc_level():
771
            log.info("Event handle_interface_link_up %s", interface)
772
            evc.handle_interface_link_up(
773 1
                interface
774 1
            )
775
776
    def handle_interface_link_down(self, interface):
777
        """
778 1
        Handler for interface link_down events
779
        """
780 1
        for evc in self.get_evcs_by_svc_level():
781 1
            log.info("Event handle_interface_link_down %s", interface)
782 1
            evc.handle_interface_link_down(
783 1
                interface
784 1
            )
785 1
786 1
    @listen_to("kytos/topology.link_down")
787 1
    def on_link_down(self, event):
788
        """Change circuit when link is down or under_mantenance."""
789
        self.handle_link_down(event)
790 1
791
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
792
        """Change circuit when link is down or under_mantenance."""
793
        link = event.content["link"]
794 1
        log.info("Event handle_link_down %s", link)
795 1
        switch_flows = {}
796 1
        evcs_with_failover = []
797 1
        evcs_normal = []
798
        check_failover = []
799 1
        for evc in self.get_evcs_by_svc_level():
800 1
            if evc.is_affected_by_link(link):
801 1
                # if there is no failover path, handles link down the
802
                # tradditional way
803
                if (
804 1
                    not getattr(evc, 'failover_path', None) or
805 1
                    evc.is_failover_path_affected_by_link(link)
806 1
                ):
807 1
                    evcs_normal.append(evc)
808 1
                    continue
809 1
                try:
810
                    dpid_flows = evc.get_failover_flows()
811 1
                # pylint: disable=broad-except
812
                except Exception:
813 1
                    err = traceback.format_exc().replace("\n", ", ")
814 1
                    log.error(
815 1
                        f"Ignore Failover path for {evc} due to error: {err}"
816 1
                    )
817 1
                    evcs_normal.append(evc)
818
                    continue
819
                for dpid, flows in dpid_flows.items():
820
                    switch_flows.setdefault(dpid, [])
821
                    switch_flows[dpid].extend(flows)
822
                evcs_with_failover.append(evc)
823
            else:
824
                check_failover.append(evc)
825
826 1
        while switch_flows:
827 1
            offset = settings.BATCH_SIZE or None
828 1
            switches = list(switch_flows.keys())
829 1
            for dpid in switches:
830 1
                emit_event(
831
                    self.controller,
832 1
                    context="kytos.flow_manager",
833 1
                    name="flows.install",
834 1
                    content={
835 1
                        "dpid": dpid,
836 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
837 1
                    }
838 1
                )
839
                if offset is None or offset >= len(switch_flows[dpid]):
840 1
                    del switch_flows[dpid]
841
                    continue
842
                switch_flows[dpid] = switch_flows[dpid][offset:]
843
            time.sleep(settings.BATCH_INTERVAL)
844 1
845 1
        for evc in evcs_with_failover:
846
            with evc.lock:
847
                old_path = evc.current_path
848
                evc.current_path = evc.failover_path
849
                evc.failover_path = old_path
850
                evc.sync()
851
            emit_event(self.controller, "redeployed_link_down",
852
                       content=map_evc_event_content(evc))
853
            log.info(
854
                f"{evc} redeployed with failover due to link down {link.id}"
855 1
            )
856 1
857 1
        for evc in evcs_normal:
858 1
            emit_event(
859
                self.controller,
860 1
                "evc_affected_by_link_down",
861 1
                content={"link_id": link.id} | map_evc_event_content(evc)
862
            )
863
864
        # After handling the hot path, check if new failover paths are needed.
865 1
        # Note that EVCs affected by link down will generate a KytosEvent for
866
        # deployed|redeployed, which will trigger the failover path setup.
867 1
        # Thus, we just need to further check the check_failover list
868 1
        for evc in check_failover:
869 1
            if evc.is_failover_path_affected_by_link(link):
870 1
                with evc.lock:
871 1
                    evc.setup_failover_path()
872 1
873 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
874 1
    def on_evc_affected_by_link_down(self, event):
875 1
        """Change circuit when link is down or under_mantenance."""
876 1
        self.handle_evc_affected_by_link_down(event)
877 1
878
    def handle_evc_affected_by_link_down(self, event):
879
        """Change circuit when link is down or under_mantenance."""
880 1
        evc = self.circuits.get(event.content["evc_id"])
881 1
        link_id = event.content['link_id']
882
        if not evc:
883
            return
884
        with evc.lock:
885 1
            result = evc.handle_link_down()
886
        event_name = "error_redeploy_link_down"
887 1
        if result:
888 1
            log.info(f"{evc} redeployed due to link down {link_id}")
889 1
            event_name = "redeployed_link_down"
890 1
        emit_event(self.controller, event_name,
891 1
                   content=map_evc_event_content(evc))
892
893 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
894 1
    def on_evc_deployed(self, event):
895
        """Handle EVC deployed|redeployed_link_down."""
896
        self.handle_evc_deployed(event)
897
898 1
    def handle_evc_deployed(self, event):
899
        """Setup failover path on evc deployed."""
900 1
        evc = self.circuits.get(event.content["evc_id"])
901 1
        if not evc:
902 1
            return
903 1
        with evc.lock:
904
            evc.setup_failover_path()
905 1
906
    @listen_to("kytos/topology.topology_loaded")
907 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
908 1
        """Load EVCs once the topology is available."""
909 1
        self.load_all_evcs()
910 1
911
    def load_all_evcs(self):
912
        """Try to load all EVCs on startup."""
913 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
914
        for circuit_id, circuit in circuits:
915 1
            if circuit_id not in self.circuits:
916 1
                self._load_evc(circuit)
917
918 1
    def _load_evc(self, circuit_dict):
919 1
        """Load one EVC from mongodb to memory."""
920 1
        try:
921
            evc = self._evc_from_dict(circuit_dict)
922 1
        except ValueError as exception:
923 1
            log.error(
924
                f"Could not load EVC: dict={circuit_dict} error={exception}"
925
            )
926
            return None
927 1
        except KytosTagError as exception:
928
            log.error(
929 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
930 1
            )
931 1
            return None
932
        if evc.archived:
933 1
            return None
934 1
935 1
        self.circuits.setdefault(evc.id, evc)
936 1
        self.sched.add(evc)
937
        return evc
938 1
939
    @listen_to("kytos/flow_manager.flow.error")
940
    def on_flow_mod_error(self, event):
941
        """Handle flow mod errors related to an EVC."""
942
        self.handle_flow_mod_error(event)
943 1
944 1
    def handle_flow_mod_error(self, event):
945
        """Handle flow mod errors related to an EVC."""
946
        flow = event.content["flow"]
947 1
        command = event.content.get("error_command")
948 1
        if command != "add":
949 1
            return
950 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
951 1
        if evc:
952 1
            with evc.lock:
953
                evc.remove_current_flows()
954 1
955 1
    def _evc_dict_with_instances(self, evc_dict):
956 1
        """Convert some dict values to instance of EVC classes.
957 1
958
        This method will convert: [UNI, Link]
959
        """
960
        data = evc_dict.copy()  # Do not modify the original dict
961
        for attribute, value in data.items():
962
            # Get multiple attributes.
963
            # Ex: uni_a, uni_z
964
            if "uni" in attribute:
965 1
                try:
966 1
                    data[attribute] = self._uni_from_dict(value)
967
                except ValueError as exception:
968
                    result = "Error creating UNI: Invalid value"
969
                    raise ValueError(result) from exception
970
971
            if attribute == "circuit_scheduler":
972
                data[attribute] = []
973 1
                for schedule in value:
974 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
975
976
            # Get multiple attributes.
977
            # Ex: primary_links,
978 1
            #     backup_links,
979
            #     current_links_cache,
980 1
            #     primary_links_cache,
981 1
            #     backup_links_cache
982 1
            if "links" in attribute:
983 1
                data[attribute] = [
984
                    self._link_from_dict(link) for link in value
985 1
                ]
986
987 1
            # Ex: current_path,
988 1
            #     primary_path,
989
            #     backup_path
990 1
            if "path" in attribute and attribute != "dynamic_backup_path":
991 1
                data[attribute] = Path(
992 1
                    [self._link_from_dict(link) for link in value]
993 1
                )
994
995
        return data
996
997 1
    def _evc_from_dict(self, evc_dict):
998 1
        data = self._evc_dict_with_instances(evc_dict)
999 1
        data["table_group"] = self.table_group
1000 1
        return EVC(self.controller, **data)
1001 1
1002 1
    def _uni_from_dict(self, uni_dict):
1003 1
        """Return a UNI object from python dict."""
1004 1
        if uni_dict is None:
1005
            return False
1006 1
1007 1
        interface_id = uni_dict.get("interface_id")
1008
        interface = self.controller.get_interface_by_id(interface_id)
1009 1
        if interface is None:
1010
            result = (
1011 1
                "Error creating UNI:"
1012
                + f"Could not instantiate interface {interface_id}"
1013 1
            )
1014 1
            raise ValueError(result) from ValueError
1015
        tag_convert = {1: "vlan"}
1016 1
        tag_dict = uni_dict.get("tag", None)
1017 1
        if tag_dict:
1018 1
            tag_type = tag_dict.get("tag_type")
1019 1
            tag_type = tag_convert.get(tag_type, tag_type)
1020 1
            tag_value = tag_dict.get("value")
1021 1
            if isinstance(tag_value, list):
1022
                tag_value = get_tag_ranges(tag_value)
1023
                mask_list = get_vlan_tags_and_masks(tag_value)
1024
                tag = TAGRange(tag_type, tag_value, mask_list)
1025 1
            else:
1026 1
                tag = TAG(tag_type, tag_value)
1027 1
        else:
1028
            tag = None
1029 1
        uni = UNI(interface, tag)
1030 1
        return uni
1031 1
1032 1
    def _link_from_dict(self, link_dict):
1033
        """Return a Link object from python dict."""
1034
        id_a = link_dict.get("endpoint_a").get("id")
1035 1
        id_b = link_dict.get("endpoint_b").get("id")
1036 1
1037
        endpoint_a = self.controller.get_interface_by_id(id_a)
1038 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1039
        if not endpoint_a:
1040
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1041
            raise ValueError(error_msg)
1042
        if not endpoint_b:
1043
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1044
            raise ValueError(error_msg)
1045 1
1046 1
        link = Link(endpoint_a, endpoint_b)
1047 1
        if "metadata" in link_dict:
1048
            link.extend_metadata(link_dict.get("metadata"))
1049
1050 1
        s_vlan = link.get_metadata("s_vlan")
1051 1
        if s_vlan:
1052 1
            tag = TAG.from_dict(s_vlan)
1053 1
            if tag is False:
1054 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1055 1
                raise ValueError(error_msg)
1056 1
            link.update_metadata("s_vlan", tag)
1057 1
        return link
1058 1
1059
    def _find_evc_by_schedule_id(self, schedule_id):
1060 1
        """
1061
        Find an EVC and CircuitSchedule based on schedule_id.
1062
1063
        :param schedule_id: Schedule ID
1064
        :return: EVC and Schedule
1065
        """
1066 1
        circuits = self._get_circuits_buffer()
1067
        found_schedule = None
1068 1
        evc = None
1069 1
1070 1
        # pylint: disable=unused-variable
1071 1
        for c_id, circuit in circuits.items():
1072 1
            for schedule in circuit.circuit_scheduler:
1073
                if schedule.id == schedule_id:
1074
                    found_schedule = schedule
1075 1
                    evc = circuit
1076 1
                    break
1077
            if found_schedule:
1078 1
                break
1079 1
        return evc, found_schedule
1080
1081 1
    def _get_circuits_buffer(self):
1082 1
        """
1083 1
        Return the circuit buffer.
1084
1085
        If the buffer is empty, try to load data from mongodb.
1086 1
        """
1087 1
        if not self.circuits:
1088 1
            # Load circuits from mongodb to buffer
1089 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1090 1
            for c_id, circuit in circuits.items():
1091
                evc = self._evc_from_dict(circuit)
1092
                self.circuits[c_id] = evc
1093
        return self.circuits
1094
1095
    # pylint: disable=attribute-defined-outside-init
1096
    @alisten_to("kytos/of_multi_table.enable_table")
1097
    async def on_table_enabled(self, event):
1098
        """Handle a recently table enabled."""
1099
        table_group = event.content.get("mef_eline", None)
1100
        if not table_group:
1101
            return
1102
        for group in table_group:
1103
            if group not in settings.TABLE_GROUP_ALLOWED:
1104
                log.error(f'The table group "{group}" is not allowed for '
1105
                          f'mef_eline. Allowed table groups are '
1106
                          f'{settings.TABLE_GROUP_ALLOWED}')
1107
                return
1108
        self.table_group.update(table_group)
1109
        content = {"group_table": self.table_group}
1110
        name = "kytos/mef_eline.enable_table"
1111
        await aemit_event(self.controller, name, content)
1112