Passed
Push — master ( e3f2ff...728489 )
by Vinicius
02:32 queued 14s
created

build.main.Main.create_circuit()   D

Complexity

Conditions 12

Size

Total Lines 105
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 37
CRAP Score 13.3876

Importance

Changes 0
Metric Value
cc 12
eloc 63
nop 2
dl 0
loc 105
ccs 37
cts 47
cp 0.7872
crap 13.3876
rs 4.5709
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.create_circuit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
25
                                         emit_event, map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64
65 1
    def get_evcs_by_svc_level(self) -> list:
66
        """Get circuits sorted by desc service level and asc creation_time.
67
68
        In the future, as more ops are offloaded it should be get from the DB.
69
        """
70 1
        return sorted(self.circuits.values(),
71
                      key=lambda x: (-x.service_level, x.creation_time))
72
73 1
    @staticmethod
74 1
    def get_eline_controller():
75
        """Return the ELineController instance."""
76
        return controllers.ELineController()
77
78 1
    def execute(self):
79
        """Execute once when the napp is running."""
80 1
        if self._lock.locked():
81 1
            return
82 1
        log.debug("Starting consistency routine")
83 1
        with self._lock:
84 1
            self.execute_consistency()
85 1
        log.debug("Finished consistency routine")
86
87 1
    @staticmethod
88 1
    def should_be_checked(circuit):
89
        "Verify if the circuit meets the necessary conditions to be checked"
90
        # pylint: disable=too-many-boolean-expressions
91 1
        if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
                # if a inter-switch EVC does not have current_path, it does not
98
                # make sense to run sdntrace on it
99
                and (circuit.is_intra_switch() or circuit.current_path)
100
                ):
101 1
            return True
102
        return False
103
104 1
    def execute_consistency(self):
105
        """Execute consistency routine."""
106 1
        circuits_to_check = []
107 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
108 1
        for circuit in self.get_evcs_by_svc_level():
109 1
            stored_circuits.pop(circuit.id, None)
110 1
            if self.should_be_checked(circuit):
111 1
                circuits_to_check.append(circuit)
112 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
113 1
        for circuit in circuits_to_check:
114 1
            is_checked = circuits_checked.get(circuit.id)
115 1
            if is_checked:
116 1
                circuit.execution_rounds = 0
117 1
                log.info(f"{circuit} enabled but inactive - activating")
118 1
                with circuit.lock:
119 1
                    circuit.activate()
120 1
                    circuit.sync()
121
            else:
122 1
                circuit.execution_rounds += 1
123 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
124 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
125 1
                    with circuit.lock:
126 1
                        circuit.deploy()
127 1
        for circuit_id in stored_circuits:
128 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
129 1
            self._load_evc(stored_circuits[circuit_id])
130
131 1
    def shutdown(self):
132
        """Execute when your napp is unloaded.
133
134
        If you have some cleanup procedure, insert it here.
135
        """
136
137 1
    @rest("/v2/evc/", methods=["GET"])
138 1
    def list_circuits(self, request: Request) -> JSONResponse:
139
        """Endpoint to return circuits stored.
140
141
        archive query arg if defined (not null) will be filtered
142
        accordingly, by default only non archived evcs will be listed
143
        """
144 1
        log.debug("list_circuits /v2/evc")
145 1
        args = request.query_params
146 1
        archived = args.get("archived", "false").lower()
147 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
148 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
149
                                                      metadata=args)
150 1
        circuits = circuits['circuits']
151 1
        return JSONResponse(circuits)
152
153 1
    @rest("/v2/evc/schedule", methods=["GET"])
154 1
    def list_schedules(self, _request: Request) -> JSONResponse:
155
        """Endpoint to return all schedules stored for all circuits.
156
157
        Return a JSON with the following template:
158
        [{"schedule_id": <schedule_id>,
159
         "circuit_id": <circuit_id>,
160
         "schedule": <schedule object>}]
161
        """
162 1
        log.debug("list_schedules /v2/evc/schedule")
163 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
164 1
        if not circuits:
165 1
            result = {}
166 1
            status = 200
167 1
            return JSONResponse(result, status_code=status)
168
169 1
        result = []
170 1
        status = 200
171 1
        for circuit in circuits:
172 1
            circuit_scheduler = circuit.get("circuit_scheduler")
173 1
            if circuit_scheduler:
174 1
                for scheduler in circuit_scheduler:
175 1
                    value = {
176
                        "schedule_id": scheduler.get("id"),
177
                        "circuit_id": circuit.get("id"),
178
                        "schedule": scheduler,
179
                    }
180 1
                    result.append(value)
181
182 1
        log.debug("list_schedules result %s %s", result, status)
183 1
        return JSONResponse(result, status_code=status)
184
185 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
186 1
    def get_circuit(self, request: Request) -> JSONResponse:
187
        """Endpoint to return a circuit based on id."""
188 1
        circuit_id = request.path_params["circuit_id"]
189 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
190 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
191 1
        if not circuit:
192 1
            result = f"circuit_id {circuit_id} not found"
193 1
            log.debug("get_circuit result %s %s", result, 404)
194 1
            raise HTTPException(404, detail=result)
195 1
        status = 200
196 1
        log.debug("get_circuit result %s %s", circuit, status)
197 1
        return JSONResponse(circuit, status_code=status)
198
199 1
    @rest("/v2/evc/", methods=["POST"])
200 1
    @validate_openapi(spec)
201 1
    def create_circuit(self, request: Request) -> JSONResponse:
202
        """Try to create a new circuit.
203
204
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
205
        UNI_Z's requested C-VID are available from the interfaces' pools. This
206
        is checked when creating the UNI object.
207
208
        Then, E-Line NApp requests a primary and a backup path to the
209
        Pathfinder NApp using the attributes primary_links and backup_links
210
        submitted via REST
211
212
        # For each link composing paths in #3:
213
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
214
        #  - Using the S-VID obtained, generate abstract flow entries to be
215
        #    sent to FlowManager
216
217
        Push abstract flow entries to FlowManager and FlowManager pushes
218
        OpenFlow entries to datapaths
219
220
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
221
        creation
222
223
        Finnaly, notify user of the status of its request.
224
        """
225
        # Try to create the circuit object
226 1
        log.debug("create_circuit /v2/evc/")
227 1
        data = get_json_or_400(request, self.controller.loop)
228
229 1
        try:
230 1
            evc = self._evc_from_dict(data)
231 1
        except ValueError as exception:
232 1
            log.debug("create_circuit result %s %s", exception, 400)
233 1
            raise HTTPException(400, detail=str(exception)) from exception
234
235 1
        try:
236 1
            check_disabled_component(evc.uni_a, evc.uni_z)
237 1
        except DisabledSwitch as exception:
238 1
            log.debug("create_circuit result %s %s", exception, 409)
239 1
            raise HTTPException(
240
                    409,
241
                    detail=f"Path is not valid: {exception}"
242
                ) from exception
243
244 1
        if evc.primary_path:
245
            try:
246
                evc.primary_path.is_valid(
247
                    evc.uni_a.interface.switch,
248
                    evc.uni_z.interface.switch,
249
                    bool(evc.circuit_scheduler),
250
                )
251
            except InvalidPath as exception:
252
                raise HTTPException(
253
                    400,
254
                    detail=f"primary_path is not valid: {exception}"
255
                ) from exception
256 1
        if evc.backup_path:
257
            try:
258
                evc.backup_path.is_valid(
259
                    evc.uni_a.interface.switch,
260
                    evc.uni_z.interface.switch,
261
                    bool(evc.circuit_scheduler),
262
                )
263
            except InvalidPath as exception:
264
                raise HTTPException(
265
                    400,
266
                    detail=f"backup_path is not valid: {exception}"
267
                ) from exception
268
269
        # verify duplicated evc
270 1
        if self._is_duplicated_evc(evc):
271 1
            result = "The EVC already exists."
272 1
            log.debug("create_circuit result %s %s", result, 409)
273 1
            raise HTTPException(409, detail=result)
274
275 1
        try:
276 1
            evc._validate_has_primary_or_dynamic()
277 1
        except ValueError as exception:
278 1
            raise HTTPException(400, detail=str(exception)) from exception
279
280
        # save circuit
281 1
        try:
282 1
            evc.sync()
283
        except ValidationError as exception:
284
            raise HTTPException(400, detail=str(exception)) from exception
285
286
        # store circuit in dictionary
287 1
        self.circuits[evc.id] = evc
288
289
        # Schedule the circuit deploy
290 1
        self.sched.add(evc)
291
292
        # Circuit has no schedule, deploy now
293 1
        if not evc.circuit_scheduler:
294 1
            with evc.lock:
295 1
                evc.deploy()
296
297
        # Notify users
298 1
        result = {"circuit_id": evc.id}
299 1
        status = 201
300 1
        log.debug("create_circuit result %s %s", result, status)
301 1
        emit_event(self.controller, name="created",
302
                   content=map_evc_event_content(evc))
303 1
        return JSONResponse(result, status_code=status)
304
305 1
    @listen_to('kytos/flow_manager.flow.removed')
306 1
    def on_flow_delete(self, event):
307
        """Capture delete messages to keep track when flows got removed."""
308
        self.handle_flow_delete(event)
309
310 1
    def handle_flow_delete(self, event):
311
        """Keep track when the EVC got flows removed by deriving its cookie."""
312 1
        flow = event.content["flow"]
313 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
314 1
        if evc:
315 1
            log.debug("Flow removed in EVC %s", evc.id)
316 1
            evc.set_flow_removed_at()
317
318 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
319 1
    @validate_openapi(spec)
320 1
    def update(self, request: Request) -> JSONResponse:
321
        """Update a circuit based on payload.
322
323
        The EVC attributes (creation_time, active, current_path,
324
        failover_path, _id, archived) can't be updated.
325
        """
326 1
        data = get_json_or_400(request, self.controller.loop)
327 1
        circuit_id = request.path_params["circuit_id"]
328 1
        log.debug("update /v2/evc/%s", circuit_id)
329 1
        try:
330 1
            evc = self.circuits[circuit_id]
331 1
        except KeyError:
332 1
            result = f"circuit_id {circuit_id} not found"
333 1
            log.debug("update result %s %s", result, 404)
334 1
            raise HTTPException(404, detail=result) from KeyError
335
336 1
        if evc.archived:
337 1
            result = "Can't update archived EVC"
338 1
            log.debug("update result %s %s", result, 409)
339 1
            raise HTTPException(409, detail=result)
340
341 1
        try:
342 1
            enable, redeploy = evc.update(
343
                **self._evc_dict_with_instances(data)
344
            )
345 1
        except ValidationError as exception:
346
            raise HTTPException(400, detail=str(exception)) from exception
347 1
        except ValueError as exception:
348 1
            log.error(exception)
349 1
            log.debug("update result %s %s", exception, 400)
350 1
            raise HTTPException(400, detail=str(exception)) from exception
351 1
        except DisabledSwitch as exception:
352 1
            log.debug("update result %s %s", exception, 409)
353 1
            raise HTTPException(
354
                    409,
355
                    detail=f"Path is not valid: {exception}"
356
                ) from exception
357
358 1
        if evc.is_active():
359
            if enable is False:  # disable if active
360
                with evc.lock:
361
                    evc.remove()
362
            elif redeploy is not None:  # redeploy if active
363
                with evc.lock:
364
                    evc.remove()
365
                    evc.deploy()
366
        else:
367 1
            if enable is True:  # enable if inactive
368 1
                with evc.lock:
369 1
                    evc.deploy()
370 1
        result = {evc.id: evc.as_dict()}
371 1
        status = 200
372
373 1
        log.debug("update result %s %s", result, status)
374 1
        emit_event(self.controller, "updated",
375
                   content=map_evc_event_content(evc, **data))
376 1
        return JSONResponse(result, status_code=status)
377
378 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
379 1
    def delete_circuit(self, request: Request) -> JSONResponse:
380
        """Remove a circuit.
381
382
        First, the flows are removed from the switches, and then the EVC is
383
        disabled.
384
        """
385 1
        circuit_id = request.path_params["circuit_id"]
386 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
387 1
        try:
388 1
            evc = self.circuits[circuit_id]
389 1
        except KeyError:
390 1
            result = f"circuit_id {circuit_id} not found"
391 1
            log.debug("delete_circuit result %s %s", result, 404)
392 1
            raise HTTPException(404, detail=result) from KeyError
393
394 1
        if evc.archived:
395 1
            result = f"Circuit {circuit_id} already removed"
396 1
            log.debug("delete_circuit result %s %s", result, 404)
397 1
            raise HTTPException(404, detail=result)
398
399 1
        log.info("Removing %s", evc)
400 1
        with evc.lock:
401 1
            evc.remove_current_flows()
402 1
            evc.remove_failover_flows(sync=False)
403 1
            evc.deactivate()
404 1
            evc.disable()
405 1
            self.sched.remove(evc)
406 1
            evc.archive()
407 1
            evc.sync()
408 1
        log.info("EVC removed. %s", evc)
409 1
        result = {"response": f"Circuit {circuit_id} removed"}
410 1
        status = 200
411
412 1
        log.debug("delete_circuit result %s %s", result, status)
413 1
        emit_event(self.controller, "deleted",
414
                   content=map_evc_event_content(evc))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
418 1
    def get_metadata(self, request: Request) -> JSONResponse:
419
        """Get metadata from an EVC."""
420 1
        circuit_id = request.path_params["circuit_id"]
421 1
        try:
422 1
            return (
423
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
424
            )
425
        except KeyError as error:
426
            raise HTTPException(
427
                404,
428
                detail=f"circuit_id {circuit_id} not found."
429
            ) from error
430
431 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
432 1
    @validate_openapi(spec)
433 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
434
        """Add metadata to a bulk of EVCs."""
435 1
        data = get_json_or_400(request, self.controller.loop)
436 1
        circuit_ids = data.pop("circuit_ids")
437
438 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
439
440 1
        fail_evcs = []
441 1
        for _id in circuit_ids:
442 1
            try:
443 1
                evc = self.circuits[_id]
444 1
                evc.extend_metadata(data)
445 1
            except KeyError:
446 1
                fail_evcs.append(_id)
447
448 1
        if fail_evcs:
449 1
            raise HTTPException(404, detail=fail_evcs)
450 1
        return JSONResponse("Operation successful", status_code=201)
451
452 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
453 1
    @validate_openapi(spec)
454 1
    def add_metadata(self, request: Request) -> JSONResponse:
455
        """Add metadata to an EVC."""
456 1
        circuit_id = request.path_params["circuit_id"]
457 1
        metadata = get_json_or_400(request, self.controller.loop)
458 1
        if not isinstance(metadata, dict):
459
            raise HTTPException(400, "Invalid metadata value: {metadata}")
460 1
        try:
461 1
            evc = self.circuits[circuit_id]
462 1
        except KeyError as error:
463 1
            raise HTTPException(
464
                404,
465
                detail=f"circuit_id {circuit_id} not found."
466
            ) from error
467
468 1
        evc.extend_metadata(metadata)
469 1
        evc.sync()
470 1
        return JSONResponse("Operation successful", status_code=201)
471
472 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
473 1
    @validate_openapi(spec)
474 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
475
        """Delete metada from a bulk of EVCs"""
476 1
        data = get_json_or_400(request, self.controller.loop)
477 1
        key = request.path_params["key"]
478 1
        circuit_ids = data.pop("circuit_ids")
479 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
480
481 1
        fail_evcs = []
482 1
        for _id in circuit_ids:
483 1
            try:
484 1
                evc = self.circuits[_id]
485 1
                evc.remove_metadata(key)
486
            except KeyError:
487
                fail_evcs.append(_id)
488
489 1
        if fail_evcs:
490
            raise HTTPException(404, detail=fail_evcs)
491 1
        return JSONResponse("Operation successful")
492
493 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
494 1
    def delete_metadata(self, request: Request) -> JSONResponse:
495
        """Delete metadata from an EVC."""
496 1
        circuit_id = request.path_params["circuit_id"]
497 1
        key = request.path_params["key"]
498 1
        try:
499 1
            evc = self.circuits[circuit_id]
500 1
        except KeyError as error:
501 1
            raise HTTPException(
502
                404,
503
                detail=f"circuit_id {circuit_id} not found."
504
            ) from error
505
506 1
        evc.remove_metadata(key)
507 1
        evc.sync()
508 1
        return JSONResponse("Operation successful")
509
510 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
511 1
    def redeploy(self, request: Request) -> JSONResponse:
512
        """Endpoint to force the redeployment of an EVC."""
513 1
        circuit_id = request.path_params["circuit_id"]
514 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
515 1
        try:
516 1
            evc = self.circuits[circuit_id]
517 1
        except KeyError:
518 1
            raise HTTPException(
519
                404,
520
                detail=f"circuit_id {circuit_id} not found"
521
            ) from KeyError
522 1
        if evc.is_enabled():
523 1
            with evc.lock:
524 1
                evc.remove_current_flows()
525 1
                evc.deploy()
526 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
527 1
            status = 202
528
        else:
529 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
530 1
            status = 409
531
532 1
        return JSONResponse(result, status_code=status)
533
534 1
    @rest("/v2/evc/schedule/", methods=["POST"])
535 1
    @validate_openapi(spec)
536 1
    def create_schedule(self, request: Request) -> JSONResponse:
537
        """
538
        Create a new schedule for a given circuit.
539
540
        This service do no check if there are conflicts with another schedule.
541
        Payload example:
542
            {
543
              "circuit_id":"aa:bb:cc",
544
              "schedule": {
545
                "date": "2019-08-07T14:52:10.967Z",
546
                "interval": "string",
547
                "frequency": "1 * * * *",
548
                "action": "create"
549
              }
550
            }
551
        """
552 1
        log.debug("create_schedule /v2/evc/schedule/")
553 1
        data = get_json_or_400(request, self.controller.loop)
554 1
        circuit_id = data["circuit_id"]
555 1
        schedule_data = data["schedule"]
556
557
        # Get EVC from circuits buffer
558 1
        circuits = self._get_circuits_buffer()
559
560
        # get the circuit
561 1
        evc = circuits.get(circuit_id)
562
563
        # get the circuit
564 1
        if not evc:
565 1
            result = f"circuit_id {circuit_id} not found"
566 1
            log.debug("create_schedule result %s %s", result, 404)
567 1
            raise HTTPException(404, detail=result)
568
        # Can not modify circuits deleted and archived
569 1
        if evc.archived:
570 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
571 1
            log.debug("create_schedule result %s %s", result, 409)
572 1
            raise HTTPException(409, detail=result)
573
574
        # new schedule from dict
575 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
576
577
        # If there is no schedule, create the list
578 1
        if not evc.circuit_scheduler:
579 1
            evc.circuit_scheduler = []
580
581
        # Add the new schedule
582 1
        evc.circuit_scheduler.append(new_schedule)
583
584
        # Add schedule job
585 1
        self.sched.add_circuit_job(evc, new_schedule)
586
587
        # save circuit to mongodb
588 1
        evc.sync()
589
590 1
        result = new_schedule.as_dict()
591 1
        status = 201
592
593 1
        log.debug("create_schedule result %s %s", result, status)
594 1
        return JSONResponse(result, status_code=status)
595
596 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
597 1
    @validate_openapi(spec)
598 1
    def update_schedule(self, request: Request) -> JSONResponse:
599
        """Update a schedule.
600
601
        Change all attributes from the given schedule from a EVC circuit.
602
        The schedule ID is preserved as default.
603
        Payload example:
604
            {
605
              "date": "2019-08-07T14:52:10.967Z",
606
              "interval": "string",
607
              "frequency": "1 * * *",
608
              "action": "create"
609
            }
610
        """
611 1
        data = get_json_or_400(request, self.controller.loop)
612 1
        schedule_id = request.path_params["schedule_id"]
613 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
614
615
        # Try to find a circuit schedule
616 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
617
618
        # Can not modify circuits deleted and archived
619 1
        if not found_schedule:
620 1
            result = f"schedule_id {schedule_id} not found"
621 1
            log.debug("update_schedule result %s %s", result, 404)
622 1
            raise HTTPException(404, detail=result)
623 1
        if evc.archived:
624 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
625 1
            log.debug("update_schedule result %s %s", result, 409)
626 1
            raise HTTPException(409, detail=result)
627
628 1
        new_schedule = CircuitSchedule.from_dict(data)
629 1
        new_schedule.id = found_schedule.id
630
        # Remove the old schedule
631 1
        evc.circuit_scheduler.remove(found_schedule)
632
        # Append the modified schedule
633 1
        evc.circuit_scheduler.append(new_schedule)
634
635
        # Cancel all schedule jobs
636 1
        self.sched.cancel_job(found_schedule.id)
637
        # Add the new circuit schedule
638 1
        self.sched.add_circuit_job(evc, new_schedule)
639
        # Save EVC to mongodb
640 1
        evc.sync()
641
642 1
        result = new_schedule.as_dict()
643 1
        status = 200
644
645 1
        log.debug("update_schedule result %s %s", result, status)
646 1
        return JSONResponse(result, status_code=status)
647
648 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
649 1
    def delete_schedule(self, request: Request) -> JSONResponse:
650
        """Remove a circuit schedule.
651
652
        Remove the Schedule from EVC.
653
        Remove the Schedule from cron job.
654
        Save the EVC to the Storehouse.
655
        """
656 1
        schedule_id = request.path_params["schedule_id"]
657 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
658 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
659
660
        # Can not modify circuits deleted and archived
661 1
        if not found_schedule:
662 1
            result = f"schedule_id {schedule_id} not found"
663 1
            log.debug("delete_schedule result %s %s", result, 404)
664 1
            raise HTTPException(404, detail=result)
665
666 1
        if evc.archived:
667 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
668 1
            log.debug("delete_schedule result %s %s", result, 409)
669 1
            raise HTTPException(409, detail=result)
670
671
        # Remove the old schedule
672 1
        evc.circuit_scheduler.remove(found_schedule)
673
674
        # Cancel all schedule jobs
675 1
        self.sched.cancel_job(found_schedule.id)
676
        # Save EVC to mongodb
677 1
        evc.sync()
678
679 1
        result = "Schedule removed"
680 1
        status = 200
681
682 1
        log.debug("delete_schedule result %s %s", result, status)
683 1
        return JSONResponse(result, status_code=status)
684
685 1
    def _is_duplicated_evc(self, evc):
686
        """Verify if the circuit given is duplicated with the stored evcs.
687
688
        Args:
689
            evc (EVC): circuit to be analysed.
690
691
        Returns:
692
            boolean: True if the circuit is duplicated, otherwise False.
693
694
        """
695 1
        for circuit in tuple(self.circuits.values()):
696 1
            if not circuit.archived and circuit.shares_uni(evc):
697 1
                return True
698 1
        return False
699
700 1
    @listen_to("kytos/topology.link_up")
701 1
    def on_link_up(self, event):
702
        """Change circuit when link is up or end_maintenance."""
703
        self.handle_link_up(event)
704
705 1
    def handle_link_up(self, event):
706
        """Change circuit when link is up or end_maintenance."""
707 1
        log.info("Event handle_link_up %s", event.content["link"])
708 1
        for evc in self.get_evcs_by_svc_level():
709 1
            if evc.is_enabled() and not evc.archived:
710 1
                with evc.lock:
711 1
                    evc.handle_link_up(event.content["link"])
712
713 1
    @listen_to("kytos/topology.link_down")
714 1
    def on_link_down(self, event):
715
        """Change circuit when link is down or under_mantenance."""
716
        self.handle_link_down(event)
717
718 1
    def handle_link_down(self, event):
719
        """Change circuit when link is down or under_mantenance."""
720 1
        link = event.content["link"]
721 1
        log.info("Event handle_link_down %s", link)
722 1
        switch_flows = {}
723 1
        evcs_with_failover = []
724 1
        evcs_normal = []
725 1
        check_failover = []
726 1
        for evc in self.get_evcs_by_svc_level():
727 1
            if evc.is_affected_by_link(link):
728
                # if there is no failover path, handles link down the
729
                # tradditional way
730 1
                if (
731
                    not getattr(evc, 'failover_path', None) or
732
                    evc.is_failover_path_affected_by_link(link)
733
                ):
734 1
                    evcs_normal.append(evc)
735 1
                    continue
736 1
                for dpid, flows in evc.get_failover_flows().items():
737 1
                    switch_flows.setdefault(dpid, [])
738 1
                    switch_flows[dpid].extend(flows)
739 1
                evcs_with_failover.append(evc)
740
            else:
741 1
                check_failover.append(evc)
742
743 1
        while switch_flows:
744 1
            offset = settings.BATCH_SIZE or None
745 1
            switches = list(switch_flows.keys())
746 1
            for dpid in switches:
747 1
                emit_event(
748
                    self.controller,
749
                    context="kytos.flow_manager",
750
                    name="flows.install",
751
                    content={
752
                        "dpid": dpid,
753
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
754
                    }
755
                )
756 1
                if offset is None or offset >= len(switch_flows[dpid]):
757 1
                    del switch_flows[dpid]
758 1
                    continue
759 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
760 1
            time.sleep(settings.BATCH_INTERVAL)
761
762 1
        for evc in evcs_with_failover:
763 1
            with evc.lock:
764 1
                old_path = evc.current_path
765 1
                evc.current_path = evc.failover_path
766 1
                evc.failover_path = old_path
767 1
                evc.sync()
768 1
            emit_event(self.controller, "redeployed_link_down",
769
                       content=map_evc_event_content(evc))
770 1
            log.info(
771
                f"{evc} redeployed with failover due to link down {link.id}"
772
            )
773
774 1
        for evc in evcs_normal:
775 1
            emit_event(
776
                self.controller,
777
                "evc_affected_by_link_down",
778
                content={"link_id": link.id} | map_evc_event_content(evc)
779
            )
780
781
        # After handling the hot path, check if new failover paths are needed.
782
        # Note that EVCs affected by link down will generate a KytosEvent for
783
        # deployed|redeployed, which will trigger the failover path setup.
784
        # Thus, we just need to further check the check_failover list
785 1
        for evc in check_failover:
786 1
            if evc.is_failover_path_affected_by_link(link):
787 1
                evc.setup_failover_path()
788
789 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
790 1
    def on_evc_affected_by_link_down(self, event):
791
        """Change circuit when link is down or under_mantenance."""
792
        self.handle_evc_affected_by_link_down(event)
793
794 1
    def handle_evc_affected_by_link_down(self, event):
795
        """Change circuit when link is down or under_mantenance."""
796 1
        evc = self.circuits.get(event.content["evc_id"])
797 1
        link_id = event.content['link_id']
798 1
        if not evc:
799 1
            return
800 1
        with evc.lock:
801 1
            result = evc.handle_link_down()
802 1
        event_name = "error_redeploy_link_down"
803 1
        if result:
804 1
            log.info(f"{evc} redeployed due to link down {link_id}")
805 1
            event_name = "redeployed_link_down"
806 1
        emit_event(self.controller, event_name,
807
                   content=map_evc_event_content(evc))
808
809 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
810 1
    def on_evc_deployed(self, event):
811
        """Handle EVC deployed|redeployed_link_down."""
812
        self.handle_evc_deployed(event)
813
814 1
    def handle_evc_deployed(self, event):
815
        """Setup failover path on evc deployed."""
816 1
        evc = self.circuits.get(event.content["evc_id"])
817 1
        if not evc:
818 1
            return
819 1
        with evc.lock:
820 1
            evc.setup_failover_path()
821
822 1
    @listen_to("kytos/topology.topology_loaded")
823 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
824
        """Load EVCs once the topology is available."""
825
        self.load_all_evcs()
826
827 1
    def load_all_evcs(self):
828
        """Try to load all EVCs on startup."""
829 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
830 1
        for circuit_id, circuit in circuits:
831 1
            if circuit_id not in self.circuits:
832 1
                self._load_evc(circuit)
833
834 1
    def _load_evc(self, circuit_dict):
835
        """Load one EVC from mongodb to memory."""
836 1
        try:
837 1
            evc = self._evc_from_dict(circuit_dict)
838 1
        except ValueError as exception:
839 1
            log.error(
840
                f"Could not load EVC: dict={circuit_dict} error={exception}"
841
            )
842 1
            return None
843
844 1
        if evc.archived:
845 1
            return None
846 1
        evc.deactivate()
847 1
        evc.sync()
848 1
        self.circuits.setdefault(evc.id, evc)
849 1
        self.sched.add(evc)
850 1
        return evc
851
852 1
    @listen_to("kytos/flow_manager.flow.error")
853 1
    def on_flow_mod_error(self, event):
854
        """Handle flow mod errors related to an EVC."""
855
        self.handle_flow_mod_error(event)
856
857 1
    def handle_flow_mod_error(self, event):
858
        """Handle flow mod errors related to an EVC."""
859 1
        flow = event.content["flow"]
860 1
        command = event.content.get("error_command")
861 1
        if command != "add":
862
            return
863 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
864 1
        if evc:
865 1
            evc.remove_current_flows()
866
867 1
    def _evc_dict_with_instances(self, evc_dict):
868
        """Convert some dict values to instance of EVC classes.
869
870
        This method will convert: [UNI, Link]
871
        """
872 1
        data = evc_dict.copy()  # Do not modify the original dict
873 1
        for attribute, value in data.items():
874
            # Get multiple attributes.
875
            # Ex: uni_a, uni_z
876 1
            if "uni" in attribute:
877 1
                try:
878 1
                    data[attribute] = self._uni_from_dict(value)
879 1
                except ValueError as exception:
880 1
                    result = "Error creating UNI: Invalid value"
881 1
                    raise ValueError(result) from exception
882
883 1
            if attribute == "circuit_scheduler":
884 1
                data[attribute] = []
885 1
                for schedule in value:
886 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
887
888
            # Get multiple attributes.
889
            # Ex: primary_links,
890
            #     backup_links,
891
            #     current_links_cache,
892
            #     primary_links_cache,
893
            #     backup_links_cache
894 1
            if "links" in attribute:
895 1
                data[attribute] = [
896
                    self._link_from_dict(link) for link in value
897
                ]
898
899
            # Ex: current_path,
900
            #     primary_path,
901
            #     backup_path
902 1
            if "path" in attribute and attribute != "dynamic_backup_path":
903 1
                data[attribute] = Path(
904
                    [self._link_from_dict(link) for link in value]
905
                )
906
907 1
        return data
908
909 1
    def _evc_from_dict(self, evc_dict):
910 1
        data = self._evc_dict_with_instances(evc_dict)
911 1
        data["table_group"] = self.table_group
912 1
        return EVC(self.controller, **data)
913
914 1
    def _uni_from_dict(self, uni_dict):
915
        """Return a UNI object from python dict."""
916 1
        if uni_dict is None:
917 1
            return False
918
919 1
        interface_id = uni_dict.get("interface_id")
920 1
        interface = self.controller.get_interface_by_id(interface_id)
921 1
        if interface is None:
922 1
            result = (
923
                "Error creating UNI:"
924
                + f"Could not instantiate interface {interface_id}"
925
            )
926 1
            raise ValueError(result) from ValueError
927
928 1
        tag_dict = uni_dict.get("tag", None)
929 1
        if tag_dict:
930 1
            tag = TAG.from_dict(tag_dict)
931
        else:
932 1
            tag = None
933 1
        uni = UNI(interface, tag)
934
935 1
        return uni
936
937 1
    def _link_from_dict(self, link_dict):
938
        """Return a Link object from python dict."""
939 1
        id_a = link_dict.get("endpoint_a").get("id")
940 1
        id_b = link_dict.get("endpoint_b").get("id")
941
942 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
943 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
944 1
        if not endpoint_a:
945 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
946 1
            raise ValueError(error_msg)
947 1
        if not endpoint_b:
948
            error_msg = f"Could not get interface endpoint_b id {id_b}"
949
            raise ValueError(error_msg)
950
951 1
        link = Link(endpoint_a, endpoint_b)
952 1
        if "metadata" in link_dict:
953 1
            link.extend_metadata(link_dict.get("metadata"))
954
955 1
        s_vlan = link.get_metadata("s_vlan")
956 1
        if s_vlan:
957 1
            tag = TAG.from_dict(s_vlan)
958 1
            if tag is False:
959
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
960
                raise ValueError(error_msg)
961 1
            link.update_metadata("s_vlan", tag)
962 1
        return link
963
964 1
    def _find_evc_by_schedule_id(self, schedule_id):
965
        """
966
        Find an EVC and CircuitSchedule based on schedule_id.
967
968
        :param schedule_id: Schedule ID
969
        :return: EVC and Schedule
970
        """
971 1
        circuits = self._get_circuits_buffer()
972 1
        found_schedule = None
973 1
        evc = None
974
975
        # pylint: disable=unused-variable
976 1
        for c_id, circuit in circuits.items():
977 1
            for schedule in circuit.circuit_scheduler:
978 1
                if schedule.id == schedule_id:
979 1
                    found_schedule = schedule
980 1
                    evc = circuit
981 1
                    break
982 1
            if found_schedule:
983 1
                break
984 1
        return evc, found_schedule
985
986 1
    def _get_circuits_buffer(self):
987
        """
988
        Return the circuit buffer.
989
990
        If the buffer is empty, try to load data from mongodb.
991
        """
992 1
        if not self.circuits:
993
            # Load circuits from mongodb to buffer
994 1
            circuits = self.mongo_controller.get_circuits()['circuits']
995 1
            for c_id, circuit in circuits.items():
996 1
                evc = self._evc_from_dict(circuit)
997 1
                self.circuits[c_id] = evc
998 1
        return self.circuits
999
1000
    # pylint: disable=attribute-defined-outside-init
1001 1
    @alisten_to("kytos/of_multi_table.enable_table")
1002 1
    async def on_table_enabled(self, event):
1003
        """Handle a recently table enabled."""
1004 1
        table_group = event.content.get("mef_eline", None)
1005 1
        if not table_group:
1006
            return
1007 1
        for group in table_group:
1008 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1009 1
                log.error(f'The table group "{group}" is not allowed for '
1010
                          f'mef_eline. Allowed table groups are '
1011
                          f'{settings.TABLE_GROUP_ALLOWED}')
1012 1
                return
1013 1
        self.table_group.update(table_group)
1014 1
        content = {"group_table": self.table_group}
1015 1
        name = "kytos/mef_eline.enable_table"
1016
        await aemit_event(self.controller, name, content)
1017