Test Failed
Pull Request — master (#383)
by
unknown
03:38
created

build.main.Main.should_be_checked()   A

Complexity

Conditions 2

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0116

Importance

Changes 0
Metric Value
cc 2
eloc 16
nop 2
dl 0
loc 21
ccs 6
cts 7
cp 0.8571
crap 2.0116
rs 9.6
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
23
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24 1
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
27
                                         emit_event, map_evc_event_content)
28
29
30 1
# pylint: disable=too-many-public-methods
31
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36 1
37
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38 1
39
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47 1
        # object used to scheduler circuit events
48
        self.sched = Scheduler()
49
50 1
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52
        self.mongo_controller.bootstrap_indexes()
53
54 1
        # set the controller that will manager the dynamic paths
55
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58 1
        # Every create/update/delete must be synced to mongodb.
59
        self.circuits = {}
60 1
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64 1
65 1
        self.load_all_evcs()
66
        self._topology_updated_at = None
67 1
68
    def get_evcs_by_svc_level(self) -> list:
69
        """Get circuits sorted by desc service level and asc creation_time.
70
71
        In the future, as more ops are offloaded it should be get from the DB.
72 1
        """
73
        return sorted(self.circuits.values(),
74
                      key=lambda x: (-x.service_level, x.creation_time))
75 1
76 1
    @staticmethod
77
    def get_eline_controller():
78
        """Return the ELineController instance."""
79
        return controllers.ELineController()
80 1
81
    def execute(self):
82 1
        """Execute once when the napp is running."""
83 1
        if self._lock.locked():
84 1
            return
85 1
        log.debug("Starting consistency routine")
86 1
        with self._lock:
87 1
            self.execute_consistency()
88
        log.debug("Finished consistency routine")
89 1
90
    def should_be_checked(self, circuit: EVC):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92 1
        # pylint: disable=too-many-boolean-expressions
93
        log.info(f'Checking if {circuit} should be checked')
94
        conditions = (
95
            circuit.is_enabled(),
96
            not circuit.is_active(),
97
            not circuit.lock.locked(),
98
            not circuit.has_recent_removed_flow(),
99
            not circuit.is_recent_updated(),
100
            circuit.are_unis_active(self.controller.switches),
101
            # if a inter-switch EVC does not have current_path, it does not
102
            # make sense to run sdntrace on it
103 1
            circuit.is_intra_switch() or circuit.current_path,
104
        )
105
        log.info(f'{conditions}')
106 1
        if all(conditions):
107
            log.info(f'Running consistency check on {circuit}')
108 1
            return True
109 1
        log.info(f'Not running consistency check on {circuit}')
110 1
        return False
111 1
112 1
    def execute_consistency(self):
113 1
        """Execute consistency routine."""
114 1
        circuits_to_check = []
115 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
116 1
        for circuit in self.get_evcs_by_svc_level():
117 1
            stored_circuits.pop(circuit.id, None)
118 1
            if self.should_be_checked(circuit):
119 1
                circuits_to_check.append(circuit)
120 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
121 1
        for circuit in circuits_to_check:
122 1
            is_checked = circuits_checked.get(circuit.id)
123
            if is_checked:
124 1
                circuit.execution_rounds = 0
125 1
                log.info(f"{circuit} enabled but inactive - activating")
126 1
                with circuit.lock:
127 1
                    circuit.activate()
128 1
                    circuit.sync()
129 1
            else:
130 1
                circuit.execution_rounds += 1
131 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
132
                    log.info(f"{circuit} enabled but inactive - redeploy")
133 1
                    with circuit.lock:
134
                        circuit.deploy()
135
        for circuit_id in stored_circuits:
136
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
137
            self._load_evc(stored_circuits[circuit_id])
138
139 1
    def shutdown(self):
140 1
        """Execute when your napp is unloaded.
141
142
        If you have some cleanup procedure, insert it here.
143
        """
144
145
    @rest("/v2/evc/", methods=["GET"])
146 1
    def list_circuits(self, request: Request) -> JSONResponse:
147 1
        """Endpoint to return circuits stored.
148 1
149 1
        archive query arg if defined (not null) will be filtered
150 1
        accordingly, by default only non archived evcs will be listed
151
        """
152 1
        log.debug("list_circuits /v2/evc")
153 1
        args = request.query_params
154
        archived = args.get("archived", "false").lower()
155 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
156 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
157
                                                      metadata=args)
158
        circuits = circuits['circuits']
159
        return JSONResponse(circuits)
160
161
    @rest("/v2/evc/schedule", methods=["GET"])
162
    def list_schedules(self, _request: Request) -> JSONResponse:
163
        """Endpoint to return all schedules stored for all circuits.
164 1
165 1
        Return a JSON with the following template:
166 1
        [{"schedule_id": <schedule_id>,
167 1
         "circuit_id": <circuit_id>,
168 1
         "schedule": <schedule object>}]
169 1
        """
170
        log.debug("list_schedules /v2/evc/schedule")
171 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
172 1
        if not circuits:
173 1
            result = {}
174 1
            status = 200
175 1
            return JSONResponse(result, status_code=status)
176 1
177 1
        result = []
178
        status = 200
179
        for circuit in circuits:
180
            circuit_scheduler = circuit.get("circuit_scheduler")
181
            if circuit_scheduler:
182 1
                for scheduler in circuit_scheduler:
183
                    value = {
184 1
                        "schedule_id": scheduler.get("id"),
185 1
                        "circuit_id": circuit.get("id"),
186
                        "schedule": scheduler,
187 1
                    }
188 1
                    result.append(value)
189
190 1
        log.debug("list_schedules result %s %s", result, status)
191 1
        return JSONResponse(result, status_code=status)
192 1
193 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
194 1
    def get_circuit(self, request: Request) -> JSONResponse:
195 1
        """Endpoint to return a circuit based on id."""
196 1
        circuit_id = request.path_params["circuit_id"]
197 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
198 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
199 1
        if not circuit:
200
            result = f"circuit_id {circuit_id} not found"
201 1
            log.debug("get_circuit result %s %s", result, 404)
202 1
            raise HTTPException(404, detail=result)
203 1
        status = 200
204
        log.debug("get_circuit result %s %s", circuit, status)
205
        return JSONResponse(circuit, status_code=status)
206
207
    @rest("/v2/evc/", methods=["POST"])
208
    @validate_openapi(spec)
209
    def create_circuit(self, request: Request) -> JSONResponse:
210
        """Try to create a new circuit.
211
212
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
213
        UNI_Z's requested C-VID are available from the interfaces' pools. This
214
        is checked when creating the UNI object.
215
216
        Then, E-Line NApp requests a primary and a backup path to the
217
        Pathfinder NApp using the attributes primary_links and backup_links
218
        submitted via REST
219
220
        # For each link composing paths in #3:
221
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
222
        #  - Using the S-VID obtained, generate abstract flow entries to be
223
        #    sent to FlowManager
224
225
        Push abstract flow entries to FlowManager and FlowManager pushes
226
        OpenFlow entries to datapaths
227
228 1
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
229 1
        creation
230
231 1
        Finnaly, notify user of the status of its request.
232 1
        """
233 1
        # Try to create the circuit object
234 1
        log.debug("create_circuit /v2/evc/")
235 1
        data = get_json_or_400(request, self.controller.loop)
236
237 1
        try:
238 1
            evc = self._evc_from_dict(data)
239 1
        except ValueError as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 400)
241 1
            raise HTTPException(400, detail=str(exception)) from exception
242
243
        try:
244
            check_disabled_component(evc.uni_a, evc.uni_z)
245
        except DisabledSwitch as exception:
246 1
            log.debug("create_circuit result %s %s", exception, 409)
247
            raise HTTPException(
248
                    409,
249
                    detail=f"Path is not valid: {exception}"
250
                ) from exception
251
252
        if evc.primary_path:
253
            try:
254
                evc.primary_path.is_valid(
255
                    evc.uni_a.interface.switch,
256
                    evc.uni_z.interface.switch,
257
                    bool(evc.circuit_scheduler),
258 1
                )
259
            except InvalidPath as exception:
260
                raise HTTPException(
261
                    400,
262
                    detail=f"primary_path is not valid: {exception}"
263
                ) from exception
264
        if evc.backup_path:
265
            try:
266
                evc.backup_path.is_valid(
267
                    evc.uni_a.interface.switch,
268
                    evc.uni_z.interface.switch,
269
                    bool(evc.circuit_scheduler),
270
                )
271
            except InvalidPath as exception:
272 1
                raise HTTPException(
273 1
                    400,
274 1
                    detail=f"backup_path is not valid: {exception}"
275 1
                ) from exception
276
277 1
        # verify duplicated evc
278 1
        if self._is_duplicated_evc(evc):
279 1
            result = "The EVC already exists."
280 1
            log.debug("create_circuit result %s %s", result, 409)
281
            raise HTTPException(409, detail=result)
282 1
283 1
        try:
284
            evc._validate_has_primary_or_dynamic()
285
        except ValueError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288 1
        try:
289 1
            self._use_uni_tags(evc)
290
        except ValueError as exception:
291
            raise HTTPException(400, detail=str(exception)) from exception
292
293
        # save circuit
294 1
        try:
295
            evc.sync()
296
        except ValidationError as exception:
297 1
            raise HTTPException(400, detail=str(exception)) from exception
298
299
        # store circuit in dictionary
300 1
        self.circuits[evc.id] = evc
301 1
302 1
        # Schedule the circuit deploy
303
        self.sched.add(evc)
304
305 1
        # Circuit has no schedule, deploy now
306 1
        if not evc.circuit_scheduler:
307 1
            with evc.lock:
308 1
                evc.deploy()
309
310 1
        # Notify users
311
        result = {"circuit_id": evc.id}
312 1
        status = 201
313 1
        log.debug("create_circuit result %s %s", result, status)
314 1
        emit_event(self.controller, name="created",
315 1
                   content=map_evc_event_content(evc))
316 1
        return JSONResponse(result, status_code=status)
317 1
318 1
    @staticmethod
319 1
    def _use_uni_tags(evc):
320 1
        uni_a = evc.uni_a
321 1
        try:
322 1
            evc._use_uni_vlan(uni_a)
323 1
        except ValueError as err:
324 1
            raise err
325
        try:
326 1
            uni_z = evc.uni_z
327 1
            evc._use_uni_vlan(uni_z)
328
        except ValueError as err:
329
            evc.make_uni_vlan_available(uni_a)
330
            raise err
331 1
332
    @listen_to('kytos/flow_manager.flow.removed')
333 1
    def on_flow_delete(self, event):
334 1
        """Capture delete messages to keep track when flows got removed."""
335 1
        self.handle_flow_delete(event)
336 1
337 1
    def handle_flow_delete(self, event):
338
        """Keep track when the EVC got flows removed by deriving its cookie."""
339 1
        flow = event.content["flow"]
340 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
341 1
        if evc:
342
            log.debug("Flow removed in EVC %s", evc.id)
343
            evc.set_flow_removed_at()
344
345
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
346
    @validate_openapi(spec)
347 1
    def update(self, request: Request) -> JSONResponse:
348 1
        """Update a circuit based on payload.
349 1
350 1
        The EVC attributes (creation_time, active, current_path,
351 1
        failover_path, _id, archived) can't be updated.
352 1
        """
353 1
        data = get_json_or_400(request, self.controller.loop)
354 1
        circuit_id = request.path_params["circuit_id"]
355 1
        log.debug("update /v2/evc/%s", circuit_id)
356
        try:
357 1
            evc = self.circuits[circuit_id]
358 1
        except KeyError:
359 1
            result = f"circuit_id {circuit_id} not found"
360 1
            log.debug("update result %s %s", result, 404)
361
            raise HTTPException(404, detail=result) from KeyError
362 1
363 1
        if evc.archived:
364
            result = "Can't update archived EVC"
365
            log.debug("update result %s %s", result, 409)
366 1
            raise HTTPException(409, detail=result)
367
368 1
        try:
369 1
            enable, redeploy = evc.update(
370 1
                **self._evc_dict_with_instances(data)
371 1
            )
372 1
        except ValidationError as exception:
373 1
            raise HTTPException(400, detail=str(exception)) from exception
374 1
        except ValueError as exception:
375
            log.error(exception)
376
            log.debug("update result %s %s", exception, 400)
377
            raise HTTPException(400, detail=str(exception)) from exception
378
        except DisabledSwitch as exception:
379 1
            log.debug("update result %s %s", exception, 409)
380
            raise HTTPException(
381
                    409,
382
                    detail=f"Path is not valid: {exception}"
383
                ) from exception
384
385
        if evc.is_active():
386
            if enable is False:  # disable if active
387
                with evc.lock:
388 1
                    evc.remove()
389 1
            elif redeploy is not None:  # redeploy if active
390 1
                with evc.lock:
391 1
                    evc.remove()
392 1
                    evc.deploy()
393
        else:
394 1
            if enable is True:  # enable if inactive
395 1
                with evc.lock:
396
                    evc.deploy()
397 1
        result = {evc.id: evc.as_dict()}
398
        status = 200
399 1
400 1
        log.debug("update result %s %s", result, status)
401
        emit_event(self.controller, "updated",
402
                   content=map_evc_event_content(evc, **data))
403
        return JSONResponse(result, status_code=status)
404
405
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
406 1
    def delete_circuit(self, request: Request) -> JSONResponse:
407 1
        """Remove a circuit.
408 1
409 1
        First, the flows are removed from the switches, and then the EVC is
410 1
        disabled.
411 1
        """
412 1
        circuit_id = request.path_params["circuit_id"]
413 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
414
        try:
415 1
            evc = self.circuits[circuit_id]
416 1
        except KeyError:
417 1
            result = f"circuit_id {circuit_id} not found"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419
            raise HTTPException(404, detail=result) from KeyError
420 1
421 1
        if evc.archived:
422 1
            result = f"Circuit {circuit_id} already removed"
423 1
            log.debug("delete_circuit result %s %s", result, 404)
424 1
            raise HTTPException(404, detail=result)
425 1
426 1
        log.info("Removing %s", evc)
427 1
        with evc.lock:
428 1
            evc.remove_current_flows()
429 1
            evc.remove_failover_flows(sync=False)
430 1
            evc.deactivate()
431 1
            evc.disable()
432 1
            self.sched.remove(evc)
433
            evc.archive()
434 1
            evc.remove_uni_tags()
435 1
            evc.sync()
436
        log.info("EVC removed. %s", evc)
437 1
        result = {"response": f"Circuit {circuit_id} removed"}
438
        status = 200
439 1
440 1
        log.debug("delete_circuit result %s %s", result, status)
441
        emit_event(self.controller, "deleted",
442 1
                   content=map_evc_event_content(evc))
443 1
        return JSONResponse(result, status_code=status)
444 1
445
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
446
    def get_metadata(self, request: Request) -> JSONResponse:
447
        """Get metadata from an EVC."""
448
        circuit_id = request.path_params["circuit_id"]
449
        try:
450
            return (
451
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
452
            )
453 1
        except KeyError as error:
454 1
            raise HTTPException(
455 1
                404,
456
                detail=f"circuit_id {circuit_id} not found."
457 1
            ) from error
458 1
459 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460 1
    @validate_openapi(spec)
461
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
462 1
        """Add metadata to a bulk of EVCs."""
463 1
        data = get_json_or_400(request, self.controller.loop)
464 1
        circuit_ids = data.pop("circuit_ids")
465 1
466 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
467 1
468 1
        fail_evcs = []
469
        for _id in circuit_ids:
470 1
            try:
471 1
                evc = self.circuits[_id]
472 1
                evc.extend_metadata(data)
473
            except KeyError:
474 1
                fail_evcs.append(_id)
475 1
476 1
        if fail_evcs:
477
            raise HTTPException(404, detail=fail_evcs)
478 1
        return JSONResponse("Operation successful", status_code=201)
479 1
480 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
481
    @validate_openapi(spec)
482 1
    def add_metadata(self, request: Request) -> JSONResponse:
483 1
        """Add metadata to an EVC."""
484 1
        circuit_id = request.path_params["circuit_id"]
485 1
        metadata = get_json_or_400(request, self.controller.loop)
486
        if not isinstance(metadata, dict):
487
            raise HTTPException(400, "Invalid metadata value: {metadata}")
488
        try:
489
            evc = self.circuits[circuit_id]
490 1
        except KeyError as error:
491 1
            raise HTTPException(
492 1
                404,
493
                detail=f"circuit_id {circuit_id} not found."
494 1
            ) from error
495 1
496 1
        evc.extend_metadata(metadata)
497
        evc.sync()
498 1
        return JSONResponse("Operation successful", status_code=201)
499 1
500 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
501 1
    @validate_openapi(spec)
502
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
503 1
        """Delete metada from a bulk of EVCs"""
504 1
        data = get_json_or_400(request, self.controller.loop)
505 1
        key = request.path_params["key"]
506 1
        circuit_ids = data.pop("circuit_ids")
507 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
508
509
        fail_evcs = []
510
        for _id in circuit_ids:
511 1
            try:
512
                evc = self.circuits[_id]
513 1
                evc.remove_metadata(key)
514
            except KeyError:
515 1
                fail_evcs.append(_id)
516 1
517
        if fail_evcs:
518 1
            raise HTTPException(404, detail=fail_evcs)
519 1
        return JSONResponse("Operation successful")
520 1
521 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
522 1
    def delete_metadata(self, request: Request) -> JSONResponse:
523 1
        """Delete metadata from an EVC."""
524
        circuit_id = request.path_params["circuit_id"]
525
        key = request.path_params["key"]
526
        try:
527
            evc = self.circuits[circuit_id]
528 1
        except KeyError as error:
529 1
            raise HTTPException(
530 1
                404,
531
                detail=f"circuit_id {circuit_id} not found."
532 1
            ) from error
533 1
534
        evc.remove_metadata(key)
535 1
        evc.sync()
536 1
        return JSONResponse("Operation successful")
537 1
538 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
539 1
    def redeploy(self, request: Request) -> JSONResponse:
540 1
        """Endpoint to force the redeployment of an EVC."""
541
        circuit_id = request.path_params["circuit_id"]
542
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
543
        try:
544 1
            evc = self.circuits[circuit_id]
545 1
        except KeyError:
546 1
            raise HTTPException(
547 1
                404,
548 1
                detail=f"circuit_id {circuit_id} not found"
549 1
            ) from KeyError
550
        if evc.is_enabled():
551 1
            with evc.lock:
552 1
                evc.remove_current_flows()
553
                evc.deploy()
554 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
555
            status = 202
556 1
        else:
557 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
558 1
            status = 409
559
560
        return JSONResponse(result, status_code=status)
561
562
    @rest("/v2/evc/schedule/", methods=["POST"])
563
    @validate_openapi(spec)
564
    def create_schedule(self, request: Request) -> JSONResponse:
565
        """
566
        Create a new schedule for a given circuit.
567
568
        This service do no check if there are conflicts with another schedule.
569
        Payload example:
570
            {
571
              "circuit_id":"aa:bb:cc",
572
              "schedule": {
573
                "date": "2019-08-07T14:52:10.967Z",
574 1
                "interval": "string",
575 1
                "frequency": "1 * * * *",
576 1
                "action": "create"
577 1
              }
578
            }
579
        """
580 1
        log.debug("create_schedule /v2/evc/schedule/")
581
        data = get_json_or_400(request, self.controller.loop)
582
        circuit_id = data["circuit_id"]
583 1
        schedule_data = data["schedule"]
584
585
        # Get EVC from circuits buffer
586 1
        circuits = self._get_circuits_buffer()
587 1
588 1
        # get the circuit
589 1
        evc = circuits.get(circuit_id)
590
591 1
        # get the circuit
592 1
        if not evc:
593 1
            result = f"circuit_id {circuit_id} not found"
594 1
            log.debug("create_schedule result %s %s", result, 404)
595
            raise HTTPException(404, detail=result)
596
        # Can not modify circuits deleted and archived
597 1
        if evc.archived:
598
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
599
            log.debug("create_schedule result %s %s", result, 409)
600 1
            raise HTTPException(409, detail=result)
601 1
602
        # new schedule from dict
603
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604 1
605
        # If there is no schedule, create the list
606
        if not evc.circuit_scheduler:
607 1
            evc.circuit_scheduler = []
608
609
        # Add the new schedule
610 1
        evc.circuit_scheduler.append(new_schedule)
611
612 1
        # Add schedule job
613 1
        self.sched.add_circuit_job(evc, new_schedule)
614
615 1
        # save circuit to mongodb
616 1
        evc.sync()
617
618 1
        result = new_schedule.as_dict()
619 1
        status = 201
620 1
621
        log.debug("create_schedule result %s %s", result, status)
622
        return JSONResponse(result, status_code=status)
623
624
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625
    @validate_openapi(spec)
626
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629
        Change all attributes from the given schedule from a EVC circuit.
630
        The schedule ID is preserved as default.
631
        Payload example:
632
            {
633 1
              "date": "2019-08-07T14:52:10.967Z",
634 1
              "interval": "string",
635 1
              "frequency": "1 * * *",
636
              "action": "create"
637
            }
638 1
        """
639
        data = get_json_or_400(request, self.controller.loop)
640
        schedule_id = request.path_params["schedule_id"]
641 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642 1
643 1
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645 1
646 1
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648 1
            result = f"schedule_id {schedule_id} not found"
649
            log.debug("update_schedule result %s %s", result, 404)
650 1
            raise HTTPException(404, detail=result)
651 1
        if evc.archived:
652
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653 1
            log.debug("update_schedule result %s %s", result, 409)
654
            raise HTTPException(409, detail=result)
655 1
656
        new_schedule = CircuitSchedule.from_dict(data)
657
        new_schedule.id = found_schedule.id
658 1
        # Remove the old schedule
659
        evc.circuit_scheduler.remove(found_schedule)
660 1
        # Append the modified schedule
661
        evc.circuit_scheduler.append(new_schedule)
662 1
663
        # Cancel all schedule jobs
664 1
        self.sched.cancel_job(found_schedule.id)
665 1
        # Add the new circuit schedule
666
        self.sched.add_circuit_job(evc, new_schedule)
667 1
        # Save EVC to mongodb
668 1
        evc.sync()
669
670 1
        result = new_schedule.as_dict()
671 1
        status = 200
672
673
        log.debug("update_schedule result %s %s", result, status)
674
        return JSONResponse(result, status_code=status)
675
676
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
677
    def delete_schedule(self, request: Request) -> JSONResponse:
678 1
        """Remove a circuit schedule.
679 1
680 1
        Remove the Schedule from EVC.
681
        Remove the Schedule from cron job.
682
        Save the EVC to the Storehouse.
683 1
        """
684 1
        schedule_id = request.path_params["schedule_id"]
685 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
686 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
687
688 1
        # Can not modify circuits deleted and archived
689 1
        if not found_schedule:
690 1
            result = f"schedule_id {schedule_id} not found"
691 1
            log.debug("delete_schedule result %s %s", result, 404)
692
            raise HTTPException(404, detail=result)
693
694 1
        if evc.archived:
695
            result = f"Circuit {evc.id} is archived. Update is forbidden."
696
            log.debug("delete_schedule result %s %s", result, 409)
697 1
            raise HTTPException(409, detail=result)
698
699 1
        # Remove the old schedule
700
        evc.circuit_scheduler.remove(found_schedule)
701 1
702 1
        # Cancel all schedule jobs
703
        self.sched.cancel_job(found_schedule.id)
704 1
        # Save EVC to mongodb
705 1
        evc.sync()
706
707 1
        result = "Schedule removed"
708
        status = 200
709
710
        log.debug("delete_schedule result %s %s", result, status)
711
        return JSONResponse(result, status_code=status)
712
713
    def _is_duplicated_evc(self, evc):
714
        """Verify if the circuit given is duplicated with the stored evcs.
715
716
        Args:
717 1
            evc (EVC): circuit to be analysed.
718 1
719 1
        Returns:
720 1
            boolean: True if the circuit is duplicated, otherwise False.
721
722 1
        """
723 1
        for circuit in tuple(self.circuits.values()):
724
            if not circuit.archived and circuit.shares_uni(evc):
725
                return True
726
        return False
727 1
728
    @listen_to("kytos/topology.link_up")
729 1
    def on_link_up(self, event):
730 1
        """Change circuit when link is up or end_maintenance."""
731 1
        self.handle_link_up(event)
732 1
733 1
    def handle_link_up(self, event):
734
        """Change circuit when link is up or end_maintenance."""
735 1
        log.info("Event handle_link_up %s", event.content["link"])
736 1
        for evc in self.get_evcs_by_svc_level():
737
            if evc.is_enabled() and not evc.archived:
738
                with evc.lock:
739
                    evc.handle_link_up(event.content["link"])
740 1
741
    # Possibly replace this with interruptions?
742 1
    @listen_to(
743 1
        '.*.switch.interface.(link_up|link_down|created|deleted)',
744
        pool='dynamic_single'
745
    )
746
    def on_interface_link_change(self, event: KytosEvent):
747
        """
748 1
        Handler for interface link_up and link_down events
749 1
        """
750 1
        with self._lock:
751 1
            _, _, event_type = event.name.rpartition('.')
752 1
            iface = event.content.get("interface")
753
            if event_type in ('link_up', 'created'):
754
                self.handle_interface_link_up(iface)
755
            elif event_type in ('link_down', 'deleted'):
756 1
                self.handle_interface_link_down(iface)
757 1
758
    def handle_interface_link_up(self, interface):
759
        """
760
        Handler for interface link_up events
761 1
        """
762
        for evc in self.get_evcs_by_svc_level():
763 1
            log.info("Event handle_interface_link_up %s", interface)
764 1
            evc.handle_interface_link_up(
765 1
                interface
766 1
            )
767 1
768 1
    def handle_interface_link_down(self, interface):
769 1
        """
770 1
        Handler for interface link_down events
771
        """
772
        for evc in self.get_evcs_by_svc_level():
773 1
            log.info("Event handle_interface_link_down %s", interface)
774
            evc.handle_interface_link_down(
775
                interface
776
            )
777 1
778 1
    @listen_to("kytos/topology.link_down")
779 1
    def on_link_down(self, event):
780 1
        """Change circuit when link is down or under_mantenance."""
781
        self.handle_link_down(event)
782 1
783 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
784 1
        """Change circuit when link is down or under_mantenance."""
785
        link = event.content["link"]
786
        log.info("Event handle_link_down %s", link)
787 1
        switch_flows = {}
788 1
        evcs_with_failover = []
789 1
        evcs_normal = []
790 1
        check_failover = []
791 1
        for evc in self.get_evcs_by_svc_level():
792 1
            if evc.is_affected_by_link(link):
793
                # if there is no failover path, handles link down the
794 1
                # tradditional way
795
                if (
796 1
                    not getattr(evc, 'failover_path', None) or
797 1
                    evc.is_failover_path_affected_by_link(link)
798 1
                ):
799 1
                    evcs_normal.append(evc)
800 1
                    continue
801
                try:
802
                    dpid_flows = evc.get_failover_flows()
803
                # pylint: disable=broad-except
804
                except Exception:
805
                    err = traceback.format_exc().replace("\n", ", ")
806
                    log.error(
807
                        f"Ignore Failover path for {evc} due to error: {err}"
808
                    )
809 1
                    evcs_normal.append(evc)
810 1
                    continue
811 1
                for dpid, flows in dpid_flows.items():
812 1
                    switch_flows.setdefault(dpid, [])
813 1
                    switch_flows[dpid].extend(flows)
814
                evcs_with_failover.append(evc)
815 1
            else:
816 1
                check_failover.append(evc)
817 1
818 1
        while switch_flows:
819 1
            offset = settings.BATCH_SIZE or None
820 1
            switches = list(switch_flows.keys())
821 1
            for dpid in switches:
822
                emit_event(
823 1
                    self.controller,
824
                    context="kytos.flow_manager",
825
                    name="flows.install",
826
                    content={
827 1
                        "dpid": dpid,
828 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
829
                    }
830
                )
831
                if offset is None or offset >= len(switch_flows[dpid]):
832
                    del switch_flows[dpid]
833
                    continue
834
                switch_flows[dpid] = switch_flows[dpid][offset:]
835
            time.sleep(settings.BATCH_INTERVAL)
836
837
        for evc in evcs_with_failover:
838 1
            with evc.lock:
839 1
                old_path = evc.current_path
840 1
                evc.current_path = evc.failover_path
841 1
                evc.failover_path = old_path
842
                evc.sync()
843 1
            emit_event(self.controller, "redeployed_link_down",
844 1
                       content=map_evc_event_content(evc))
845
            log.info(
846
                f"{evc} redeployed with failover due to link down {link.id}"
847
            )
848 1
849
        for evc in evcs_normal:
850 1
            emit_event(
851 1
                self.controller,
852 1
                "evc_affected_by_link_down",
853 1
                content={"link_id": link.id} | map_evc_event_content(evc)
854 1
            )
855 1
856 1
        # After handling the hot path, check if new failover paths are needed.
857 1
        # Note that EVCs affected by link down will generate a KytosEvent for
858 1
        # deployed|redeployed, which will trigger the failover path setup.
859 1
        # Thus, we just need to further check the check_failover list
860 1
        for evc in check_failover:
861
            if evc.is_failover_path_affected_by_link(link):
862
                with evc.lock:
863 1
                    evc.setup_failover_path()
864 1
865
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
866
    def on_evc_affected_by_link_down(self, event):
867
        """Change circuit when link is down or under_mantenance."""
868 1
        self.handle_evc_affected_by_link_down(event)
869
870 1
    def handle_evc_affected_by_link_down(self, event):
871 1
        """Change circuit when link is down or under_mantenance."""
872 1
        evc = self.circuits.get(event.content["evc_id"])
873 1
        link_id = event.content['link_id']
874 1
        if not evc:
875
            return
876 1
        with evc.lock:
877 1
            result = evc.handle_link_down()
878
        event_name = "error_redeploy_link_down"
879
        if result:
880
            log.info(f"{evc} redeployed due to link down {link_id}")
881 1
            event_name = "redeployed_link_down"
882
        emit_event(self.controller, event_name,
883 1
                   content=map_evc_event_content(evc))
884 1
885 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
886 1
    def on_evc_deployed(self, event):
887
        """Handle EVC deployed|redeployed_link_down."""
888 1
        self.handle_evc_deployed(event)
889
890 1
    def handle_evc_deployed(self, event):
891 1
        """Setup failover path on evc deployed."""
892 1
        evc = self.circuits.get(event.content["evc_id"])
893 1
        if not evc:
894
            return
895
        with evc.lock:
896 1
            evc.setup_failover_path()
897
898 1
    @listen_to("kytos/topology.topology_loaded")
899 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
900 1
        """Load EVCs once the topology is available."""
901 1
        self.load_all_evcs()
902 1
903 1
    def load_all_evcs(self):
904 1
        """Try to load all EVCs on startup."""
905
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
906 1
        for circuit_id, circuit in circuits:
907 1
            if circuit_id not in self.circuits:
908
                self._load_evc(circuit)
909
910
    def _load_evc(self, circuit_dict):
911 1
        """Load one EVC from mongodb to memory."""
912
        try:
913 1
            evc = self._evc_from_dict(circuit_dict)
914 1
        except ValueError as exception:
915 1
            log.error(
916
                f"Could not load EVC: dict={circuit_dict} error={exception}"
917 1
            )
918 1
            return None
919 1
920 1
        if evc.archived:
921
            return None
922 1
923
        self.circuits.setdefault(evc.id, evc)
924
        self.sched.add(evc)
925
        return evc
926
927 1
    @listen_to("kytos/flow_manager.flow.error")
928 1
    def on_flow_mod_error(self, event):
929
        """Handle flow mod errors related to an EVC."""
930
        self.handle_flow_mod_error(event)
931 1
932 1
    def handle_flow_mod_error(self, event):
933 1
        """Handle flow mod errors related to an EVC."""
934 1
        flow = event.content["flow"]
935 1
        command = event.content.get("error_command")
936 1
        if command != "add":
937
            return
938 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
939 1
        if evc:
940 1
            with evc.lock:
941 1
                evc.remove_current_flows()
942
943
    def _evc_dict_with_instances(self, evc_dict):
944
        """Convert some dict values to instance of EVC classes.
945
946
        This method will convert: [UNI, Link]
947
        """
948
        data = evc_dict.copy()  # Do not modify the original dict
949 1
        for attribute, value in data.items():
950 1
            # Get multiple attributes.
951
            # Ex: uni_a, uni_z
952
            if "uni" in attribute:
953
                try:
954
                    data[attribute] = self._uni_from_dict(value)
955
                except ValueError as exception:
956
                    result = f"Error creating UNI: {exception}"
957 1
                    raise ValueError(result) from exception
958 1
959
            if attribute == "circuit_scheduler":
960
                data[attribute] = []
961
                for schedule in value:
962 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
963
964 1
            # Get multiple attributes.
965 1
            # Ex: primary_links,
966 1
            #     backup_links,
967 1
            #     current_links_cache,
968
            #     primary_links_cache,
969 1
            #     backup_links_cache
970
            if "links" in attribute:
971 1
                data[attribute] = [
972 1
                    self._link_from_dict(link) for link in value
973
                ]
974 1
975 1
            # Ex: current_path,
976 1
            #     primary_path,
977 1
            #     backup_path
978
            if "path" in attribute and attribute != "dynamic_backup_path":
979
                data[attribute] = Path(
980
                    [self._link_from_dict(link) for link in value]
981 1
                )
982 1
983 1
        return data
984 1
985 1
    def _evc_from_dict(self, evc_dict):
986 1
        data = self._evc_dict_with_instances(evc_dict)
987 1
        data["table_group"] = self.table_group
988 1
        return EVC(self.controller, **data)
989
990 1
    def _uni_from_dict(self, uni_dict):
991 1
        """Return a UNI object from python dict."""
992
        if uni_dict is None:
993 1
            return False
994
995 1
        interface_id = uni_dict.get("interface_id")
996
        interface = self.controller.get_interface_by_id(interface_id)
997 1
        if interface is None:
998 1
            result = (
999
                "Error creating UNI:"
1000 1
                + f"Could not instantiate interface {interface_id}"
1001 1
            )
1002 1
            raise ValueError(result) from ValueError
1003 1
        tag_convert = {1: "vlan"}
1004 1
        tag_dict = uni_dict.get("tag", None)
1005 1
        if tag_dict:
1006
            tag_type = tag_dict.get("tag_type")
1007
            tag_type = tag_convert.get(tag_type, tag_type)
1008
            tag_value = tag_dict.get("value")
1009 1
            tag = TAG(tag_type, tag_value)
1010 1
        else:
1011 1
            tag = None
1012
        uni = UNI(interface, tag)
1013 1
1014 1
        return uni
1015 1
1016 1
    def _link_from_dict(self, link_dict):
1017
        """Return a Link object from python dict."""
1018
        id_a = link_dict.get("endpoint_a").get("id")
1019 1
        id_b = link_dict.get("endpoint_b").get("id")
1020 1
1021
        endpoint_a = self.controller.get_interface_by_id(id_a)
1022 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1023
        if not endpoint_a:
1024
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1025
            raise ValueError(error_msg)
1026
        if not endpoint_b:
1027
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1028
            raise ValueError(error_msg)
1029 1
1030 1
        link = Link(endpoint_a, endpoint_b)
1031 1
        if "metadata" in link_dict:
1032
            link.extend_metadata(link_dict.get("metadata"))
1033
1034 1
        s_vlan = link.get_metadata("s_vlan")
1035 1
        if s_vlan:
1036 1
            tag = TAG.from_dict(s_vlan)
1037 1
            if tag is False:
1038 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1039 1
                raise ValueError(error_msg)
1040 1
            link.update_metadata("s_vlan", tag)
1041 1
        return link
1042 1
1043
    def _find_evc_by_schedule_id(self, schedule_id):
1044 1
        """
1045
        Find an EVC and CircuitSchedule based on schedule_id.
1046
1047
        :param schedule_id: Schedule ID
1048
        :return: EVC and Schedule
1049
        """
1050 1
        circuits = self._get_circuits_buffer()
1051
        found_schedule = None
1052 1
        evc = None
1053 1
1054 1
        # pylint: disable=unused-variable
1055 1
        for c_id, circuit in circuits.items():
1056 1
            for schedule in circuit.circuit_scheduler:
1057
                if schedule.id == schedule_id:
1058
                    found_schedule = schedule
1059 1
                    evc = circuit
1060 1
                    break
1061
            if found_schedule:
1062 1
                break
1063 1
        return evc, found_schedule
1064
1065 1
    def _get_circuits_buffer(self):
1066 1
        """
1067 1
        Return the circuit buffer.
1068
1069
        If the buffer is empty, try to load data from mongodb.
1070 1
        """
1071 1
        if not self.circuits:
1072 1
            # Load circuits from mongodb to buffer
1073 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1074 1
            for c_id, circuit in circuits.items():
1075
                evc = self._evc_from_dict(circuit)
1076
                self.circuits[c_id] = evc
1077
        return self.circuits
1078
1079
    # pylint: disable=attribute-defined-outside-init
1080
    @alisten_to("kytos/of_multi_table.enable_table")
1081
    async def on_table_enabled(self, event):
1082
        """Handle a recently table enabled."""
1083
        table_group = event.content.get("mef_eline", None)
1084
        if not table_group:
1085
            return
1086
        for group in table_group:
1087
            if group not in settings.TABLE_GROUP_ALLOWED:
1088
                log.error(f'The table group "{group}" is not allowed for '
1089
                          f'mef_eline. Allowed table groups are '
1090
                          f'{settings.TABLE_GROUP_ALLOWED}')
1091
                return
1092
        self.table_group.update(table_group)
1093
        content = {"group_table": self.table_group}
1094
        name = "kytos/mef_eline.enable_table"
1095
        await aemit_event(self.controller, name, content)
1096