Test Failed
Pull Request — master (#383)
by
unknown
04:55
created

build.main.Main.update_schedule()   A

Complexity

Conditions 3

Size

Total Lines 51
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 26
nop 2
dl 0
loc 51
rs 9.256
c 0
b 0
f 0
ccs 25
cts 25
cp 1
crap 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
23
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24 1
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
27
                                         emit_event, map_evc_event_content)
28
29
30 1
# pylint: disable=too-many-public-methods
31
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36 1
37
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38 1
39
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47 1
        # object used to scheduler circuit events
48
        self.sched = Scheduler()
49
50 1
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52
        self.mongo_controller.bootstrap_indexes()
53
54 1
        # set the controller that will manager the dynamic paths
55
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58 1
        # Every create/update/delete must be synced to mongodb.
59
        self.circuits = {}
60 1
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64 1
65 1
        self.load_all_evcs()
66
        self._topology_updated_at = None
67 1
68
    def get_evcs_by_svc_level(self) -> list:
69
        """Get circuits sorted by desc service level and asc creation_time.
70
71
        In the future, as more ops are offloaded it should be get from the DB.
72 1
        """
73
        return sorted(self.circuits.values(),
74
                      key=lambda x: (-x.service_level, x.creation_time))
75 1
76 1
    @staticmethod
77
    def get_eline_controller():
78
        """Return the ELineController instance."""
79
        return controllers.ELineController()
80 1
81
    def execute(self):
82 1
        """Execute once when the napp is running."""
83 1
        if self._lock.locked():
84 1
            return
85 1
        log.debug("Starting consistency routine")
86 1
        with self._lock:
87 1
            self.execute_consistency()
88
        log.debug("Finished consistency routine")
89 1
90
    def should_be_checked(self, circuit):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92 1
        # pylint: disable=too-many-boolean-expressions
93
        if (
94
                circuit.is_enabled()
95
                and not circuit.is_active()
96
                and not circuit.lock.locked()
97
                and not circuit.has_recent_removed_flow()
98
                and not circuit.is_recent_updated()
99
                and circuit.are_unis_active(self.controller.switches)
100
                # if a inter-switch EVC does not have current_path, it does not
101
                # make sense to run sdntrace on it
102
                and (circuit.is_intra_switch() or circuit.current_path)
103 1
                ):
104
            return True
105
        return False
106 1
107
    def execute_consistency(self):
108 1
        """Execute consistency routine."""
109 1
        circuits_to_check = []
110 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
111 1
        for circuit in self.get_evcs_by_svc_level():
112 1
            stored_circuits.pop(circuit.id, None)
113 1
            if self.should_be_checked(circuit):
114 1
                circuits_to_check.append(circuit)
115 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
116 1
        for circuit in circuits_to_check:
117 1
            is_checked = circuits_checked.get(circuit.id)
118 1
            if is_checked:
119 1
                circuit.execution_rounds = 0
120 1
                log.info(f"{circuit} enabled but inactive - activating")
121 1
                with circuit.lock:
122 1
                    circuit.activate()
123
                    circuit.sync()
124 1
            else:
125 1
                circuit.execution_rounds += 1
126 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
127 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
128 1
                    with circuit.lock:
129 1
                        circuit.deploy()
130 1
        for circuit_id in stored_circuits:
131 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
132
            self._load_evc(stored_circuits[circuit_id])
133 1
134
    def shutdown(self):
135
        """Execute when your napp is unloaded.
136
137
        If you have some cleanup procedure, insert it here.
138
        """
139 1
140 1
    @rest("/v2/evc/", methods=["GET"])
141
    def list_circuits(self, request: Request) -> JSONResponse:
142
        """Endpoint to return circuits stored.
143
144
        archive query arg if defined (not null) will be filtered
145
        accordingly, by default only non archived evcs will be listed
146 1
        """
147 1
        log.debug("list_circuits /v2/evc")
148 1
        args = request.query_params
149 1
        archived = args.get("archived", "false").lower()
150 1
        check_consistency = args.get("checkConsistency", "false").lower()
151
        args = {k: v for k, v in args.items() if k not in {"archived", "checkConsistency"}}
152 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
153 1
                                                      metadata=args)
154
        circuits = circuits['circuits']
155 1
        if check_consistency == 'true':
156 1
            check_circuits = [value for key, value in self.circuits.items() if key in circuits]
157
            consistent_circuits = EVCDeploy.check_list_traces(check_circuits)
158
            for circuit in circuits:
159
                circuits[circuit]['consistent'] = consistent_circuits.get(circuit, False) 
160
161
        return JSONResponse(circuits)
162
163
    @rest("/v2/evc/schedule", methods=["GET"])
164 1
    def list_schedules(self, _request: Request) -> JSONResponse:
165 1
        """Endpoint to return all schedules stored for all circuits.
166 1
167 1
        Return a JSON with the following template:
168 1
        [{"schedule_id": <schedule_id>,
169 1
         "circuit_id": <circuit_id>,
170
         "schedule": <schedule object>}]
171 1
        """
172 1
        log.debug("list_schedules /v2/evc/schedule")
173 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
174 1
        if not circuits:
175 1
            result = {}
176 1
            status = 200
177 1
            return JSONResponse(result, status_code=status)
178
179
        result = []
180
        status = 200
181
        for circuit in circuits:
182 1
            circuit_scheduler = circuit.get("circuit_scheduler")
183
            if circuit_scheduler:
184 1
                for scheduler in circuit_scheduler:
185 1
                    value = {
186
                        "schedule_id": scheduler.get("id"),
187 1
                        "circuit_id": circuit.get("id"),
188 1
                        "schedule": scheduler,
189
                    }
190 1
                    result.append(value)
191 1
192 1
        log.debug("list_schedules result %s %s", result, status)
193 1
        return JSONResponse(result, status_code=status)
194 1
195 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
196 1
    def get_circuit(self, request: Request) -> JSONResponse:
197 1
        """Endpoint to return a circuit based on id."""
198 1
        circuit_id = request.path_params["circuit_id"]
199 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
200
        circuit = self.mongo_controller.get_circuit(circuit_id)
201 1
        if not circuit:
202 1
            result = f"circuit_id {circuit_id} not found"
203 1
            log.debug("get_circuit result %s %s", result, 404)
204
            raise HTTPException(404, detail=result)
205
        status = 200
206
        log.debug("get_circuit result %s %s", circuit, status)
207
        return JSONResponse(circuit, status_code=status)
208
209
    @rest("/v2/evc/", methods=["POST"])
210
    @validate_openapi(spec)
211
    def create_circuit(self, request: Request) -> JSONResponse:
212
        """Try to create a new circuit.
213
214
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
215
        UNI_Z's requested C-VID are available from the interfaces' pools. This
216
        is checked when creating the UNI object.
217
218
        Then, E-Line NApp requests a primary and a backup path to the
219
        Pathfinder NApp using the attributes primary_links and backup_links
220
        submitted via REST
221
222
        # For each link composing paths in #3:
223
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
224
        #  - Using the S-VID obtained, generate abstract flow entries to be
225
        #    sent to FlowManager
226
227
        Push abstract flow entries to FlowManager and FlowManager pushes
228 1
        OpenFlow entries to datapaths
229 1
230
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
231 1
        creation
232 1
233 1
        Finnaly, notify user of the status of its request.
234 1
        """
235 1
        # Try to create the circuit object
236
        log.debug("create_circuit /v2/evc/")
237 1
        data = get_json_or_400(request, self.controller.loop)
238 1
239 1
        try:
240 1
            evc = self._evc_from_dict(data)
241 1
        except ValueError as exception:
242
            log.debug("create_circuit result %s %s", exception, 400)
243
            raise HTTPException(400, detail=str(exception)) from exception
244
245
        try:
246 1
            check_disabled_component(evc.uni_a, evc.uni_z)
247
        except DisabledSwitch as exception:
248
            log.debug("create_circuit result %s %s", exception, 409)
249
            raise HTTPException(
250
                    409,
251
                    detail=f"Path is not valid: {exception}"
252
                ) from exception
253
254
        if evc.primary_path:
255
            try:
256
                evc.primary_path.is_valid(
257
                    evc.uni_a.interface.switch,
258 1
                    evc.uni_z.interface.switch,
259
                    bool(evc.circuit_scheduler),
260
                )
261
            except InvalidPath as exception:
262
                raise HTTPException(
263
                    400,
264
                    detail=f"primary_path is not valid: {exception}"
265
                ) from exception
266
        if evc.backup_path:
267
            try:
268
                evc.backup_path.is_valid(
269
                    evc.uni_a.interface.switch,
270
                    evc.uni_z.interface.switch,
271
                    bool(evc.circuit_scheduler),
272 1
                )
273 1
            except InvalidPath as exception:
274 1
                raise HTTPException(
275 1
                    400,
276
                    detail=f"backup_path is not valid: {exception}"
277 1
                ) from exception
278 1
279 1
        # verify duplicated evc
280 1
        if self._is_duplicated_evc(evc):
281
            result = "The EVC already exists."
282 1
            log.debug("create_circuit result %s %s", result, 409)
283 1
            raise HTTPException(409, detail=result)
284
285
        try:
286
            evc._validate_has_primary_or_dynamic()
287
        except ValueError as exception:
288 1
            raise HTTPException(400, detail=str(exception)) from exception
289 1
290
        try:
291
            self._use_uni_tags(evc)
292
        except ValueError as exception:
293
            raise HTTPException(400, detail=str(exception)) from exception
294 1
295
        # save circuit
296
        try:
297 1
            evc.sync()
298
        except ValidationError as exception:
299
            raise HTTPException(400, detail=str(exception)) from exception
300 1
301 1
        # store circuit in dictionary
302 1
        self.circuits[evc.id] = evc
303
304
        # Schedule the circuit deploy
305 1
        self.sched.add(evc)
306 1
307 1
        # Circuit has no schedule, deploy now
308 1
        if not evc.circuit_scheduler:
309
            with evc.lock:
310 1
                evc.deploy()
311
312 1
        # Notify users
313 1
        result = {"circuit_id": evc.id}
314 1
        status = 201
315 1
        log.debug("create_circuit result %s %s", result, status)
316 1
        emit_event(self.controller, name="created",
317 1
                   content=map_evc_event_content(evc))
318 1
        return JSONResponse(result, status_code=status)
319 1
320 1
    @staticmethod
321 1
    def _use_uni_tags(evc):
322 1
        uni_a = evc.uni_a
323 1
        try:
324 1
            evc._use_uni_vlan(uni_a)
325
        except ValueError as err:
326 1
            raise err
327 1
        try:
328
            uni_z = evc.uni_z
329
            evc._use_uni_vlan(uni_z)
330
        except ValueError as err:
331 1
            evc.make_uni_vlan_available(uni_a)
332
            raise err
333 1
334 1
    @listen_to('kytos/flow_manager.flow.removed')
335 1
    def on_flow_delete(self, event):
336 1
        """Capture delete messages to keep track when flows got removed."""
337 1
        self.handle_flow_delete(event)
338
339 1
    def handle_flow_delete(self, event):
340 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
341 1
        flow = event.content["flow"]
342
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
343
        if evc:
344
            log.debug("Flow removed in EVC %s", evc.id)
345
            evc.set_flow_removed_at()
346
347 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
348 1
    @validate_openapi(spec)
349 1
    def update(self, request: Request) -> JSONResponse:
350 1
        """Update a circuit based on payload.
351 1
352 1
        The EVC attributes (creation_time, active, current_path,
353 1
        failover_path, _id, archived) can't be updated.
354 1
        """
355 1
        data = get_json_or_400(request, self.controller.loop)
356
        circuit_id = request.path_params["circuit_id"]
357 1
        log.debug("update /v2/evc/%s", circuit_id)
358 1
        try:
359 1
            evc = self.circuits[circuit_id]
360 1
        except KeyError:
361
            result = f"circuit_id {circuit_id} not found"
362 1
            log.debug("update result %s %s", result, 404)
363 1
            raise HTTPException(404, detail=result) from KeyError
364
365
        if evc.archived:
366 1
            result = "Can't update archived EVC"
367
            log.debug("update result %s %s", result, 409)
368 1
            raise HTTPException(409, detail=result)
369 1
370 1
        try:
371 1
            enable, redeploy = evc.update(
372 1
                **self._evc_dict_with_instances(data)
373 1
            )
374 1
        except ValidationError as exception:
375
            raise HTTPException(400, detail=str(exception)) from exception
376
        except ValueError as exception:
377
            log.error(exception)
378
            log.debug("update result %s %s", exception, 400)
379 1
            raise HTTPException(400, detail=str(exception)) from exception
380
        except DisabledSwitch as exception:
381
            log.debug("update result %s %s", exception, 409)
382
            raise HTTPException(
383
                    409,
384
                    detail=f"Path is not valid: {exception}"
385
                ) from exception
386
387
        if evc.is_active():
388 1
            if enable is False:  # disable if active
389 1
                with evc.lock:
390 1
                    evc.remove()
391 1
            elif redeploy is not None:  # redeploy if active
392 1
                with evc.lock:
393
                    evc.remove()
394 1
                    evc.deploy()
395 1
        else:
396
            if enable is True:  # enable if inactive
397 1
                with evc.lock:
398
                    evc.deploy()
399 1
        result = {evc.id: evc.as_dict()}
400 1
        status = 200
401
402
        log.debug("update result %s %s", result, status)
403
        emit_event(self.controller, "updated",
404
                   content=map_evc_event_content(evc, **data))
405
        return JSONResponse(result, status_code=status)
406 1
407 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
408 1
    def delete_circuit(self, request: Request) -> JSONResponse:
409 1
        """Remove a circuit.
410 1
411 1
        First, the flows are removed from the switches, and then the EVC is
412 1
        disabled.
413 1
        """
414
        circuit_id = request.path_params["circuit_id"]
415 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
416 1
        try:
417 1
            evc = self.circuits[circuit_id]
418 1
        except KeyError:
419
            result = f"circuit_id {circuit_id} not found"
420 1
            log.debug("delete_circuit result %s %s", result, 404)
421 1
            raise HTTPException(404, detail=result) from KeyError
422 1
423 1
        if evc.archived:
424 1
            result = f"Circuit {circuit_id} already removed"
425 1
            log.debug("delete_circuit result %s %s", result, 404)
426 1
            raise HTTPException(404, detail=result)
427 1
428 1
        log.info("Removing %s", evc)
429 1
        with evc.lock:
430 1
            evc.remove_current_flows()
431 1
            evc.remove_failover_flows(sync=False)
432 1
            evc.deactivate()
433
            evc.disable()
434 1
            self.sched.remove(evc)
435 1
            evc.archive()
436
            evc.remove_uni_tags()
437 1
            evc.sync()
438
        log.info("EVC removed. %s", evc)
439 1
        result = {"response": f"Circuit {circuit_id} removed"}
440 1
        status = 200
441
442 1
        log.debug("delete_circuit result %s %s", result, status)
443 1
        emit_event(self.controller, "deleted",
444 1
                   content=map_evc_event_content(evc))
445
        return JSONResponse(result, status_code=status)
446
447
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
448
    def get_metadata(self, request: Request) -> JSONResponse:
449
        """Get metadata from an EVC."""
450
        circuit_id = request.path_params["circuit_id"]
451
        try:
452
            return (
453 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
454 1
            )
455 1
        except KeyError as error:
456
            raise HTTPException(
457 1
                404,
458 1
                detail=f"circuit_id {circuit_id} not found."
459
            ) from error
460 1
461 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
462 1
    @validate_openapi(spec)
463 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
464 1
        """Add metadata to a bulk of EVCs."""
465 1
        data = get_json_or_400(request, self.controller.loop)
466 1
        circuit_ids = data.pop("circuit_ids")
467 1
468 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
469
470 1
        fail_evcs = []
471 1
        for _id in circuit_ids:
472 1
            try:
473
                evc = self.circuits[_id]
474 1
                evc.extend_metadata(data)
475 1
            except KeyError:
476 1
                fail_evcs.append(_id)
477
478 1
        if fail_evcs:
479 1
            raise HTTPException(404, detail=fail_evcs)
480 1
        return JSONResponse("Operation successful", status_code=201)
481
482 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
483 1
    @validate_openapi(spec)
484 1
    def add_metadata(self, request: Request) -> JSONResponse:
485 1
        """Add metadata to an EVC."""
486
        circuit_id = request.path_params["circuit_id"]
487
        metadata = get_json_or_400(request, self.controller.loop)
488
        if not isinstance(metadata, dict):
489
            raise HTTPException(400, "Invalid metadata value: {metadata}")
490 1
        try:
491 1
            evc = self.circuits[circuit_id]
492 1
        except KeyError as error:
493
            raise HTTPException(
494 1
                404,
495 1
                detail=f"circuit_id {circuit_id} not found."
496 1
            ) from error
497
498 1
        evc.extend_metadata(metadata)
499 1
        evc.sync()
500 1
        return JSONResponse("Operation successful", status_code=201)
501 1
502 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
503 1
    @validate_openapi(spec)
504 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
505 1
        """Delete metada from a bulk of EVCs"""
506 1
        data = get_json_or_400(request, self.controller.loop)
507 1
        key = request.path_params["key"]
508
        circuit_ids = data.pop("circuit_ids")
509
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
510
511 1
        fail_evcs = []
512
        for _id in circuit_ids:
513 1
            try:
514
                evc = self.circuits[_id]
515 1
                evc.remove_metadata(key)
516 1
            except KeyError:
517
                fail_evcs.append(_id)
518 1
519 1
        if fail_evcs:
520 1
            raise HTTPException(404, detail=fail_evcs)
521 1
        return JSONResponse("Operation successful")
522 1
523 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
524
    def delete_metadata(self, request: Request) -> JSONResponse:
525
        """Delete metadata from an EVC."""
526
        circuit_id = request.path_params["circuit_id"]
527
        key = request.path_params["key"]
528 1
        try:
529 1
            evc = self.circuits[circuit_id]
530 1
        except KeyError as error:
531
            raise HTTPException(
532 1
                404,
533 1
                detail=f"circuit_id {circuit_id} not found."
534
            ) from error
535 1
536 1
        evc.remove_metadata(key)
537 1
        evc.sync()
538 1
        return JSONResponse("Operation successful")
539 1
540 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
541
    def redeploy(self, request: Request) -> JSONResponse:
542
        """Endpoint to force the redeployment of an EVC."""
543
        circuit_id = request.path_params["circuit_id"]
544 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
545 1
        try:
546 1
            evc = self.circuits[circuit_id]
547 1
        except KeyError:
548 1
            raise HTTPException(
549 1
                404,
550
                detail=f"circuit_id {circuit_id} not found"
551 1
            ) from KeyError
552 1
        if evc.is_enabled():
553
            with evc.lock:
554 1
                evc.remove_current_flows()
555
                evc.deploy()
556 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
557 1
            status = 202
558 1
        else:
559
            result = {"response": f"Circuit {circuit_id} is disabled."}
560
            status = 409
561
562
        return JSONResponse(result, status_code=status)
563
564
    @rest("/v2/evc/schedule/", methods=["POST"])
565
    @validate_openapi(spec)
566
    def create_schedule(self, request: Request) -> JSONResponse:
567
        """
568
        Create a new schedule for a given circuit.
569
570
        This service do no check if there are conflicts with another schedule.
571
        Payload example:
572
            {
573
              "circuit_id":"aa:bb:cc",
574 1
              "schedule": {
575 1
                "date": "2019-08-07T14:52:10.967Z",
576 1
                "interval": "string",
577 1
                "frequency": "1 * * * *",
578
                "action": "create"
579
              }
580 1
            }
581
        """
582
        log.debug("create_schedule /v2/evc/schedule/")
583 1
        data = get_json_or_400(request, self.controller.loop)
584
        circuit_id = data["circuit_id"]
585
        schedule_data = data["schedule"]
586 1
587 1
        # Get EVC from circuits buffer
588 1
        circuits = self._get_circuits_buffer()
589 1
590
        # get the circuit
591 1
        evc = circuits.get(circuit_id)
592 1
593 1
        # get the circuit
594 1
        if not evc:
595
            result = f"circuit_id {circuit_id} not found"
596
            log.debug("create_schedule result %s %s", result, 404)
597 1
            raise HTTPException(404, detail=result)
598
        # Can not modify circuits deleted and archived
599
        if evc.archived:
600 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
601 1
            log.debug("create_schedule result %s %s", result, 409)
602
            raise HTTPException(409, detail=result)
603
604 1
        # new schedule from dict
605
        new_schedule = CircuitSchedule.from_dict(schedule_data)
606
607 1
        # If there is no schedule, create the list
608
        if not evc.circuit_scheduler:
609
            evc.circuit_scheduler = []
610 1
611
        # Add the new schedule
612 1
        evc.circuit_scheduler.append(new_schedule)
613 1
614
        # Add schedule job
615 1
        self.sched.add_circuit_job(evc, new_schedule)
616 1
617
        # save circuit to mongodb
618 1
        evc.sync()
619 1
620 1
        result = new_schedule.as_dict()
621
        status = 201
622
623
        log.debug("create_schedule result %s %s", result, status)
624
        return JSONResponse(result, status_code=status)
625
626
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
627
    @validate_openapi(spec)
628
    def update_schedule(self, request: Request) -> JSONResponse:
629
        """Update a schedule.
630
631
        Change all attributes from the given schedule from a EVC circuit.
632
        The schedule ID is preserved as default.
633 1
        Payload example:
634 1
            {
635 1
              "date": "2019-08-07T14:52:10.967Z",
636
              "interval": "string",
637
              "frequency": "1 * * *",
638 1
              "action": "create"
639
            }
640
        """
641 1
        data = get_json_or_400(request, self.controller.loop)
642 1
        schedule_id = request.path_params["schedule_id"]
643 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
644 1
645 1
        # Try to find a circuit schedule
646 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
647 1
648 1
        # Can not modify circuits deleted and archived
649
        if not found_schedule:
650 1
            result = f"schedule_id {schedule_id} not found"
651 1
            log.debug("update_schedule result %s %s", result, 404)
652
            raise HTTPException(404, detail=result)
653 1
        if evc.archived:
654
            result = f"Circuit {evc.id} is archived. Update is forbidden."
655 1
            log.debug("update_schedule result %s %s", result, 409)
656
            raise HTTPException(409, detail=result)
657
658 1
        new_schedule = CircuitSchedule.from_dict(data)
659
        new_schedule.id = found_schedule.id
660 1
        # Remove the old schedule
661
        evc.circuit_scheduler.remove(found_schedule)
662 1
        # Append the modified schedule
663
        evc.circuit_scheduler.append(new_schedule)
664 1
665 1
        # Cancel all schedule jobs
666
        self.sched.cancel_job(found_schedule.id)
667 1
        # Add the new circuit schedule
668 1
        self.sched.add_circuit_job(evc, new_schedule)
669
        # Save EVC to mongodb
670 1
        evc.sync()
671 1
672
        result = new_schedule.as_dict()
673
        status = 200
674
675
        log.debug("update_schedule result %s %s", result, status)
676
        return JSONResponse(result, status_code=status)
677
678 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
679 1
    def delete_schedule(self, request: Request) -> JSONResponse:
680 1
        """Remove a circuit schedule.
681
682
        Remove the Schedule from EVC.
683 1
        Remove the Schedule from cron job.
684 1
        Save the EVC to the Storehouse.
685 1
        """
686 1
        schedule_id = request.path_params["schedule_id"]
687
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
688 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
689 1
690 1
        # Can not modify circuits deleted and archived
691 1
        if not found_schedule:
692
            result = f"schedule_id {schedule_id} not found"
693
            log.debug("delete_schedule result %s %s", result, 404)
694 1
            raise HTTPException(404, detail=result)
695
696
        if evc.archived:
697 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
698
            log.debug("delete_schedule result %s %s", result, 409)
699 1
            raise HTTPException(409, detail=result)
700
701 1
        # Remove the old schedule
702 1
        evc.circuit_scheduler.remove(found_schedule)
703
704 1
        # Cancel all schedule jobs
705 1
        self.sched.cancel_job(found_schedule.id)
706
        # Save EVC to mongodb
707 1
        evc.sync()
708
709
        result = "Schedule removed"
710
        status = 200
711
712
        log.debug("delete_schedule result %s %s", result, status)
713
        return JSONResponse(result, status_code=status)
714
715
    def _is_duplicated_evc(self, evc):
716
        """Verify if the circuit given is duplicated with the stored evcs.
717 1
718 1
        Args:
719 1
            evc (EVC): circuit to be analysed.
720 1
721
        Returns:
722 1
            boolean: True if the circuit is duplicated, otherwise False.
723 1
724
        """
725
        for circuit in tuple(self.circuits.values()):
726
            if not circuit.archived and circuit.shares_uni(evc):
727 1
                return True
728
        return False
729 1
730 1
    @listen_to("kytos/topology.link_up")
731 1
    def on_link_up(self, event):
732 1
        """Change circuit when link is up or end_maintenance."""
733 1
        self.handle_link_up(event)
734
735 1
    def handle_link_up(self, event):
736 1
        """Change circuit when link is up or end_maintenance."""
737
        log.info("Event handle_link_up %s", event.content["link"])
738
        for evc in self.get_evcs_by_svc_level():
739
            if evc.is_enabled() and not evc.archived:
740 1
                with evc.lock:
741
                    evc.handle_link_up(event.content["link"])
742 1
743 1
    # Possibly replace this with interruptions?
744
    @listen_to(
745
        '.*.switch.interface.(link_up|link_down|created|deleted)',
746
        pool='dynamic_single'
747
    )
748 1
    def on_interface_link_change(self, event: KytosEvent):
749 1
        """
750 1
        Handler for interface link_up and link_down events
751 1
        """
752 1
        with self._lock:
753
            _, _, event_type = event.name.rpartition('.')
754
            iface = event.content.get("interface")
755
            if event_type in ('link_up', 'created'):
756 1
                self.handle_interface_link_up(iface)
757 1
            elif event_type in ('link_down', 'deleted'):
758
                self.handle_interface_link_down(iface)
759
760
    def handle_interface_link_up(self, interface):
761 1
        """
762
        Handler for interface link_up events
763 1
        """
764 1
        for evc in self.get_evcs_by_svc_level():
765 1
            log.info("Event handle_interface_link_up %s", interface)
766 1
            evc.handle_interface_link_up(
767 1
                interface
768 1
            )
769 1
770 1
    def handle_interface_link_down(self, interface):
771
        """
772
        Handler for interface link_down events
773 1
        """
774
        for evc in self.get_evcs_by_svc_level():
775
            log.info("Event handle_interface_link_down %s", interface)
776
            evc.handle_interface_link_down(
777 1
                interface
778 1
            )
779 1
780 1
    @listen_to("kytos/topology.link_down")
781
    def on_link_down(self, event):
782 1
        """Change circuit when link is down or under_mantenance."""
783 1
        self.handle_link_down(event)
784 1
785
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
786
        """Change circuit when link is down or under_mantenance."""
787 1
        link = event.content["link"]
788 1
        log.info("Event handle_link_down %s", link)
789 1
        switch_flows = {}
790 1
        evcs_with_failover = []
791 1
        evcs_normal = []
792 1
        check_failover = []
793
        for evc in self.get_evcs_by_svc_level():
794 1
            if evc.is_affected_by_link(link):
795
                # if there is no failover path, handles link down the
796 1
                # tradditional way
797 1
                if (
798 1
                    not getattr(evc, 'failover_path', None) or
799 1
                    evc.is_failover_path_affected_by_link(link)
800 1
                ):
801
                    evcs_normal.append(evc)
802
                    continue
803
                try:
804
                    dpid_flows = evc.get_failover_flows()
805
                # pylint: disable=broad-except
806
                except Exception:
807
                    err = traceback.format_exc().replace("\n", ", ")
808
                    log.error(
809 1
                        f"Ignore Failover path for {evc} due to error: {err}"
810 1
                    )
811 1
                    evcs_normal.append(evc)
812 1
                    continue
813 1
                for dpid, flows in dpid_flows.items():
814
                    switch_flows.setdefault(dpid, [])
815 1
                    switch_flows[dpid].extend(flows)
816 1
                evcs_with_failover.append(evc)
817 1
            else:
818 1
                check_failover.append(evc)
819 1
820 1
        while switch_flows:
821 1
            offset = settings.BATCH_SIZE or None
822
            switches = list(switch_flows.keys())
823 1
            for dpid in switches:
824
                emit_event(
825
                    self.controller,
826
                    context="kytos.flow_manager",
827 1
                    name="flows.install",
828 1
                    content={
829
                        "dpid": dpid,
830
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
831
                    }
832
                )
833
                if offset is None or offset >= len(switch_flows[dpid]):
834
                    del switch_flows[dpid]
835
                    continue
836
                switch_flows[dpid] = switch_flows[dpid][offset:]
837
            time.sleep(settings.BATCH_INTERVAL)
838 1
839 1
        for evc in evcs_with_failover:
840 1
            with evc.lock:
841 1
                old_path = evc.current_path
842
                evc.current_path = evc.failover_path
843 1
                evc.failover_path = old_path
844 1
                evc.sync()
845
            emit_event(self.controller, "redeployed_link_down",
846
                       content=map_evc_event_content(evc))
847
            log.info(
848 1
                f"{evc} redeployed with failover due to link down {link.id}"
849
            )
850 1
851 1
        for evc in evcs_normal:
852 1
            emit_event(
853 1
                self.controller,
854 1
                "evc_affected_by_link_down",
855 1
                content={"link_id": link.id} | map_evc_event_content(evc)
856 1
            )
857 1
858 1
        # After handling the hot path, check if new failover paths are needed.
859 1
        # Note that EVCs affected by link down will generate a KytosEvent for
860 1
        # deployed|redeployed, which will trigger the failover path setup.
861
        # Thus, we just need to further check the check_failover list
862
        for evc in check_failover:
863 1
            if evc.is_failover_path_affected_by_link(link):
864 1
                with evc.lock:
865
                    evc.setup_failover_path()
866
867
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
868 1
    def on_evc_affected_by_link_down(self, event):
869
        """Change circuit when link is down or under_mantenance."""
870 1
        self.handle_evc_affected_by_link_down(event)
871 1
872 1
    def handle_evc_affected_by_link_down(self, event):
873 1
        """Change circuit when link is down or under_mantenance."""
874 1
        evc = self.circuits.get(event.content["evc_id"])
875
        link_id = event.content['link_id']
876 1
        if not evc:
877 1
            return
878
        with evc.lock:
879
            result = evc.handle_link_down()
880
        event_name = "error_redeploy_link_down"
881 1
        if result:
882
            log.info(f"{evc} redeployed due to link down {link_id}")
883 1
            event_name = "redeployed_link_down"
884 1
        emit_event(self.controller, event_name,
885 1
                   content=map_evc_event_content(evc))
886 1
887
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
888 1
    def on_evc_deployed(self, event):
889
        """Handle EVC deployed|redeployed_link_down."""
890 1
        self.handle_evc_deployed(event)
891 1
892 1
    def handle_evc_deployed(self, event):
893 1
        """Setup failover path on evc deployed."""
894
        evc = self.circuits.get(event.content["evc_id"])
895
        if not evc:
896 1
            return
897
        with evc.lock:
898 1
            evc.setup_failover_path()
899 1
900 1
    @listen_to("kytos/topology.topology_loaded")
901 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
902 1
        """Load EVCs once the topology is available."""
903 1
        self.load_all_evcs()
904 1
905
    def load_all_evcs(self):
906 1
        """Try to load all EVCs on startup."""
907 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
908
        for circuit_id, circuit in circuits:
909
            if circuit_id not in self.circuits:
910
                self._load_evc(circuit)
911 1
912
    def _load_evc(self, circuit_dict):
913 1
        """Load one EVC from mongodb to memory."""
914 1
        try:
915 1
            evc = self._evc_from_dict(circuit_dict)
916
        except ValueError as exception:
917 1
            log.error(
918 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
919 1
            )
920 1
            return None
921
922 1
        if evc.archived:
923
            return None
924
925
        self.circuits.setdefault(evc.id, evc)
926
        self.sched.add(evc)
927 1
        return evc
928 1
929
    @listen_to("kytos/flow_manager.flow.error")
930
    def on_flow_mod_error(self, event):
931 1
        """Handle flow mod errors related to an EVC."""
932 1
        self.handle_flow_mod_error(event)
933 1
934 1
    def handle_flow_mod_error(self, event):
935 1
        """Handle flow mod errors related to an EVC."""
936 1
        flow = event.content["flow"]
937
        command = event.content.get("error_command")
938 1
        if command != "add":
939 1
            return
940 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
941 1
        if evc:
942
            with evc.lock:
943
                evc.remove_current_flows()
944
945
    def _evc_dict_with_instances(self, evc_dict):
946
        """Convert some dict values to instance of EVC classes.
947
948
        This method will convert: [UNI, Link]
949 1
        """
950 1
        data = evc_dict.copy()  # Do not modify the original dict
951
        for attribute, value in data.items():
952
            # Get multiple attributes.
953
            # Ex: uni_a, uni_z
954
            if "uni" in attribute:
955
                try:
956
                    data[attribute] = self._uni_from_dict(value)
957 1
                except ValueError as exception:
958 1
                    result = "Error creating UNI: Invalid value"
959
                    raise ValueError(result) from exception
960
961
            if attribute == "circuit_scheduler":
962 1
                data[attribute] = []
963
                for schedule in value:
964 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
965 1
966 1
            # Get multiple attributes.
967 1
            # Ex: primary_links,
968
            #     backup_links,
969 1
            #     current_links_cache,
970
            #     primary_links_cache,
971 1
            #     backup_links_cache
972 1
            if "links" in attribute:
973
                data[attribute] = [
974 1
                    self._link_from_dict(link) for link in value
975 1
                ]
976 1
977 1
            # Ex: current_path,
978
            #     primary_path,
979
            #     backup_path
980
            if "path" in attribute and attribute != "dynamic_backup_path":
981 1
                data[attribute] = Path(
982 1
                    [self._link_from_dict(link) for link in value]
983 1
                )
984 1
985 1
        return data
986 1
987 1
    def _evc_from_dict(self, evc_dict):
988 1
        data = self._evc_dict_with_instances(evc_dict)
989
        data["table_group"] = self.table_group
990 1
        return EVC(self.controller, **data)
991 1
992
    def _uni_from_dict(self, uni_dict):
993 1
        """Return a UNI object from python dict."""
994
        if uni_dict is None:
995 1
            return False
996
997 1
        interface_id = uni_dict.get("interface_id")
998 1
        interface = self.controller.get_interface_by_id(interface_id)
999
        if interface is None:
1000 1
            result = (
1001 1
                "Error creating UNI:"
1002 1
                + f"Could not instantiate interface {interface_id}"
1003 1
            )
1004 1
            raise ValueError(result) from ValueError
1005 1
        tag_convert = {1: "vlan"}
1006
        tag_dict = uni_dict.get("tag", None)
1007
        if tag_dict:
1008
            tag_type = tag_dict.get("tag_type")
1009 1
            tag_type = tag_convert.get(tag_type, tag_type)
1010 1
            tag_value = tag_dict.get("value")
1011 1
            tag = TAG(tag_type, tag_value)
1012
        else:
1013 1
            tag = None
1014 1
        uni = UNI(interface, tag)
1015 1
1016 1
        return uni
1017
1018
    def _link_from_dict(self, link_dict):
1019 1
        """Return a Link object from python dict."""
1020 1
        id_a = link_dict.get("endpoint_a").get("id")
1021
        id_b = link_dict.get("endpoint_b").get("id")
1022 1
1023
        endpoint_a = self.controller.get_interface_by_id(id_a)
1024
        endpoint_b = self.controller.get_interface_by_id(id_b)
1025
        if not endpoint_a:
1026
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1027
            raise ValueError(error_msg)
1028
        if not endpoint_b:
1029 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1030 1
            raise ValueError(error_msg)
1031 1
1032
        link = Link(endpoint_a, endpoint_b)
1033
        if "metadata" in link_dict:
1034 1
            link.extend_metadata(link_dict.get("metadata"))
1035 1
1036 1
        s_vlan = link.get_metadata("s_vlan")
1037 1
        if s_vlan:
1038 1
            tag = TAG.from_dict(s_vlan)
1039 1
            if tag is False:
1040 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1041 1
                raise ValueError(error_msg)
1042 1
            link.update_metadata("s_vlan", tag)
1043
        return link
1044 1
1045
    def _find_evc_by_schedule_id(self, schedule_id):
1046
        """
1047
        Find an EVC and CircuitSchedule based on schedule_id.
1048
1049
        :param schedule_id: Schedule ID
1050 1
        :return: EVC and Schedule
1051
        """
1052 1
        circuits = self._get_circuits_buffer()
1053 1
        found_schedule = None
1054 1
        evc = None
1055 1
1056 1
        # pylint: disable=unused-variable
1057
        for c_id, circuit in circuits.items():
1058
            for schedule in circuit.circuit_scheduler:
1059 1
                if schedule.id == schedule_id:
1060 1
                    found_schedule = schedule
1061
                    evc = circuit
1062 1
                    break
1063 1
            if found_schedule:
1064
                break
1065 1
        return evc, found_schedule
1066 1
1067 1
    def _get_circuits_buffer(self):
1068
        """
1069
        Return the circuit buffer.
1070 1
1071 1
        If the buffer is empty, try to load data from mongodb.
1072 1
        """
1073 1
        if not self.circuits:
1074 1
            # Load circuits from mongodb to buffer
1075
            circuits = self.mongo_controller.get_circuits()['circuits']
1076
            for c_id, circuit in circuits.items():
1077
                evc = self._evc_from_dict(circuit)
1078
                self.circuits[c_id] = evc
1079
        return self.circuits
1080
1081
    # pylint: disable=attribute-defined-outside-init
1082
    @alisten_to("kytos/of_multi_table.enable_table")
1083
    async def on_table_enabled(self, event):
1084
        """Handle a recently table enabled."""
1085
        table_group = event.content.get("mef_eline", None)
1086
        if not table_group:
1087
            return
1088
        for group in table_group:
1089
            if group not in settings.TABLE_GROUP_ALLOWED:
1090
                log.error(f'The table group "{group}" is not allowed for '
1091
                          f'mef_eline. Allowed table groups are '
1092
                          f'{settings.TABLE_GROUP_ALLOWED}')
1093
                return
1094
        self.table_group.update(table_group)
1095
        content = {"group_table": self.table_group}
1096
        name = "kytos/mef_eline.enable_table"
1097
        await aemit_event(self.controller, name, content)
1098