Passed
Push — master ( 5cb22e...3e1f49 )
by Vinicius
12:54 queued 10:13
created

build.main.Main.on_interface_link_change()   A

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 11.5287

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 2
dl 0
loc 15
ccs 2
cts 9
cp 0.2222
crap 11.5287
rs 9.85
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20
                                 get_json_or_400)
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
23 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
27
                                         emit_event, map_evc_event_content)
28
29
30
# pylint: disable=too-many-public-methods
31 1
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36
37 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38
39 1
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47
        # object used to scheduler circuit events
48 1
        self.sched = Scheduler()
49
50
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52 1
        self.mongo_controller.bootstrap_indexes()
53
54
        # set the controller that will manager the dynamic paths
55 1
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58
        # Every create/update/delete must be synced to mongodb.
59 1
        self.circuits = {}
60
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64
65 1
        self.load_all_evcs()
66 1
        self._topology_updated_at = None
67
68 1
    def get_evcs_by_svc_level(self) -> list:
69
        """Get circuits sorted by desc service level and asc creation_time.
70
71
        In the future, as more ops are offloaded it should be get from the DB.
72
        """
73 1
        return sorted(self.circuits.values(),
74
                      key=lambda x: (-x.service_level, x.creation_time))
75
76 1
    @staticmethod
77 1
    def get_eline_controller():
78
        """Return the ELineController instance."""
79
        return controllers.ELineController()
80
81 1
    def execute(self):
82
        """Execute once when the napp is running."""
83 1
        if self._lock.locked():
84 1
            return
85 1
        log.debug("Starting consistency routine")
86 1
        with self._lock:
87 1
            self.execute_consistency()
88 1
        log.debug("Finished consistency routine")
89
90 1
    def should_be_checked(self, circuit):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92
        # pylint: disable=too-many-boolean-expressions
93 1
        if (
94
                circuit.is_enabled()
95
                and not circuit.is_active()
96
                and not circuit.lock.locked()
97
                and not circuit.has_recent_removed_flow()
98
                and not circuit.is_recent_updated()
99
                and circuit.are_unis_active(self.controller.switches)
100
                # if a inter-switch EVC does not have current_path, it does not
101
                # make sense to run sdntrace on it
102
                and (circuit.is_intra_switch() or circuit.current_path)
103
                ):
104 1
            return True
105
        return False
106
107 1
    def execute_consistency(self):
108
        """Execute consistency routine."""
109 1
        circuits_to_check = []
110 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
111 1
        for circuit in self.get_evcs_by_svc_level():
112 1
            stored_circuits.pop(circuit.id, None)
113 1
            if self.should_be_checked(circuit):
114 1
                circuits_to_check.append(circuit)
115 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
116 1
        for circuit in circuits_to_check:
117 1
            is_checked = circuits_checked.get(circuit.id)
118 1
            if is_checked:
119 1
                circuit.execution_rounds = 0
120 1
                log.info(f"{circuit} enabled but inactive - activating")
121 1
                with circuit.lock:
122 1
                    circuit.activate()
123 1
                    circuit.sync()
124
            else:
125 1
                circuit.execution_rounds += 1
126 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
127 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
128 1
                    with circuit.lock:
129 1
                        circuit.deploy()
130 1
        for circuit_id in stored_circuits:
131 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
132 1
            self._load_evc(stored_circuits[circuit_id])
133
134 1
    def shutdown(self):
135
        """Execute when your napp is unloaded.
136
137
        If you have some cleanup procedure, insert it here.
138
        """
139
140 1
    @rest("/v2/evc/", methods=["GET"])
141 1
    def list_circuits(self, request: Request) -> JSONResponse:
142
        """Endpoint to return circuits stored.
143
144
        archive query arg if defined (not null) will be filtered
145
        accordingly, by default only non archived evcs will be listed
146
        """
147 1
        log.debug("list_circuits /v2/evc")
148 1
        args = request.query_params
149 1
        archived = args.get("archived", "false").lower()
150 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
151 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
152
                                                      metadata=args)
153 1
        circuits = circuits['circuits']
154 1
        return JSONResponse(circuits)
155
156 1
    @rest("/v2/evc/schedule", methods=["GET"])
157 1
    def list_schedules(self, _request: Request) -> JSONResponse:
158
        """Endpoint to return all schedules stored for all circuits.
159
160
        Return a JSON with the following template:
161
        [{"schedule_id": <schedule_id>,
162
         "circuit_id": <circuit_id>,
163
         "schedule": <schedule object>}]
164
        """
165 1
        log.debug("list_schedules /v2/evc/schedule")
166 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
167 1
        if not circuits:
168 1
            result = {}
169 1
            status = 200
170 1
            return JSONResponse(result, status_code=status)
171
172 1
        result = []
173 1
        status = 200
174 1
        for circuit in circuits:
175 1
            circuit_scheduler = circuit.get("circuit_scheduler")
176 1
            if circuit_scheduler:
177 1
                for scheduler in circuit_scheduler:
178 1
                    value = {
179
                        "schedule_id": scheduler.get("id"),
180
                        "circuit_id": circuit.get("id"),
181
                        "schedule": scheduler,
182
                    }
183 1
                    result.append(value)
184
185 1
        log.debug("list_schedules result %s %s", result, status)
186 1
        return JSONResponse(result, status_code=status)
187
188 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
189 1
    def get_circuit(self, request: Request) -> JSONResponse:
190
        """Endpoint to return a circuit based on id."""
191 1
        circuit_id = request.path_params["circuit_id"]
192 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
193 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
194 1
        if not circuit:
195 1
            result = f"circuit_id {circuit_id} not found"
196 1
            log.debug("get_circuit result %s %s", result, 404)
197 1
            raise HTTPException(404, detail=result)
198 1
        status = 200
199 1
        log.debug("get_circuit result %s %s", circuit, status)
200 1
        return JSONResponse(circuit, status_code=status)
201
202 1
    @rest("/v2/evc/", methods=["POST"])
203 1
    @validate_openapi(spec)
204 1
    def create_circuit(self, request: Request) -> JSONResponse:
205
        """Try to create a new circuit.
206
207
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
208
        UNI_Z's requested C-VID are available from the interfaces' pools. This
209
        is checked when creating the UNI object.
210
211
        Then, E-Line NApp requests a primary and a backup path to the
212
        Pathfinder NApp using the attributes primary_links and backup_links
213
        submitted via REST
214
215
        # For each link composing paths in #3:
216
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
217
        #  - Using the S-VID obtained, generate abstract flow entries to be
218
        #    sent to FlowManager
219
220
        Push abstract flow entries to FlowManager and FlowManager pushes
221
        OpenFlow entries to datapaths
222
223
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
224
        creation
225
226
        Finnaly, notify user of the status of its request.
227
        """
228
        # Try to create the circuit object
229 1
        log.debug("create_circuit /v2/evc/")
230 1
        data = get_json_or_400(request, self.controller.loop)
231
232 1
        try:
233 1
            evc = self._evc_from_dict(data)
234 1
        except ValueError as exception:
235 1
            log.debug("create_circuit result %s %s", exception, 400)
236 1
            raise HTTPException(400, detail=str(exception)) from exception
237
238 1
        try:
239 1
            check_disabled_component(evc.uni_a, evc.uni_z)
240 1
        except DisabledSwitch as exception:
241 1
            log.debug("create_circuit result %s %s", exception, 409)
242 1
            raise HTTPException(
243
                    409,
244
                    detail=f"Path is not valid: {exception}"
245
                ) from exception
246
247 1
        if evc.primary_path:
248
            try:
249
                evc.primary_path.is_valid(
250
                    evc.uni_a.interface.switch,
251
                    evc.uni_z.interface.switch,
252
                    bool(evc.circuit_scheduler),
253
                )
254
            except InvalidPath as exception:
255
                raise HTTPException(
256
                    400,
257
                    detail=f"primary_path is not valid: {exception}"
258
                ) from exception
259 1
        if evc.backup_path:
260
            try:
261
                evc.backup_path.is_valid(
262
                    evc.uni_a.interface.switch,
263
                    evc.uni_z.interface.switch,
264
                    bool(evc.circuit_scheduler),
265
                )
266
            except InvalidPath as exception:
267
                raise HTTPException(
268
                    400,
269
                    detail=f"backup_path is not valid: {exception}"
270
                ) from exception
271
272
        # verify duplicated evc
273 1
        if self._is_duplicated_evc(evc):
274 1
            result = "The EVC already exists."
275 1
            log.debug("create_circuit result %s %s", result, 409)
276 1
            raise HTTPException(409, detail=result)
277
278 1
        try:
279 1
            evc._validate_has_primary_or_dynamic()
280 1
        except ValueError as exception:
281 1
            raise HTTPException(400, detail=str(exception)) from exception
282
283 1
        try:
284 1
            self._use_uni_tags(evc)
285
        except ValueError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288
        # save circuit
289 1
        try:
290 1
            evc.sync()
291
        except ValidationError as exception:
292
            raise HTTPException(400, detail=str(exception)) from exception
293
294
        # store circuit in dictionary
295 1
        self.circuits[evc.id] = evc
296
297
        # Schedule the circuit deploy
298 1
        self.sched.add(evc)
299
300
        # Circuit has no schedule, deploy now
301 1
        if not evc.circuit_scheduler:
302 1
            with evc.lock:
303 1
                evc.deploy()
304
305
        # Notify users
306 1
        result = {"circuit_id": evc.id}
307 1
        status = 201
308 1
        log.debug("create_circuit result %s %s", result, status)
309 1
        emit_event(self.controller, name="created",
310
                   content=map_evc_event_content(evc))
311 1
        return JSONResponse(result, status_code=status)
312
313 1
    @staticmethod
314 1
    def _use_uni_tags(evc):
315 1
        uni_a = evc.uni_a
316 1
        try:
317 1
            evc._use_uni_vlan(uni_a)
318 1
        except ValueError as err:
319 1
            raise err
320 1
        try:
321 1
            uni_z = evc.uni_z
322 1
            evc._use_uni_vlan(uni_z)
323 1
        except ValueError as err:
324 1
            evc.make_uni_vlan_available(uni_a)
325 1
            raise err
326
327 1
    @listen_to('kytos/flow_manager.flow.removed')
328 1
    def on_flow_delete(self, event):
329
        """Capture delete messages to keep track when flows got removed."""
330
        self.handle_flow_delete(event)
331
332 1
    def handle_flow_delete(self, event):
333
        """Keep track when the EVC got flows removed by deriving its cookie."""
334 1
        flow = event.content["flow"]
335 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
336 1
        if evc:
337 1
            log.debug("Flow removed in EVC %s", evc.id)
338 1
            evc.set_flow_removed_at()
339
340 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
341 1
    @validate_openapi(spec)
342 1
    def update(self, request: Request) -> JSONResponse:
343
        """Update a circuit based on payload.
344
345
        The EVC attributes (creation_time, active, current_path,
346
        failover_path, _id, archived) can't be updated.
347
        """
348 1
        data = get_json_or_400(request, self.controller.loop)
349 1
        circuit_id = request.path_params["circuit_id"]
350 1
        log.debug("update /v2/evc/%s", circuit_id)
351 1
        try:
352 1
            evc = self.circuits[circuit_id]
353 1
        except KeyError:
354 1
            result = f"circuit_id {circuit_id} not found"
355 1
            log.debug("update result %s %s", result, 404)
356 1
            raise HTTPException(404, detail=result) from KeyError
357
358 1
        if evc.archived:
359 1
            result = "Can't update archived EVC"
360 1
            log.debug("update result %s %s", result, 409)
361 1
            raise HTTPException(409, detail=result)
362
363 1
        try:
364 1
            enable, redeploy = evc.update(
365
                **self._evc_dict_with_instances(data)
366
            )
367 1
        except ValidationError as exception:
368
            raise HTTPException(400, detail=str(exception)) from exception
369 1
        except ValueError as exception:
370 1
            log.error(exception)
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except DisabledSwitch as exception:
374 1
            log.debug("update result %s %s", exception, 409)
375 1
            raise HTTPException(
376
                    409,
377
                    detail=f"Path is not valid: {exception}"
378
                ) from exception
379
380 1
        if evc.is_active():
381
            if enable is False:  # disable if active
382
                with evc.lock:
383
                    evc.remove()
384
            elif redeploy is not None:  # redeploy if active
385
                with evc.lock:
386
                    evc.remove()
387
                    evc.deploy()
388
        else:
389 1
            if enable is True:  # enable if inactive
390 1
                with evc.lock:
391 1
                    evc.deploy()
392 1
        result = {evc.id: evc.as_dict()}
393 1
        status = 200
394
395 1
        log.debug("update result %s %s", result, status)
396 1
        emit_event(self.controller, "updated",
397
                   content=map_evc_event_content(evc, **data))
398 1
        return JSONResponse(result, status_code=status)
399
400 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
401 1
    def delete_circuit(self, request: Request) -> JSONResponse:
402
        """Remove a circuit.
403
404
        First, the flows are removed from the switches, and then the EVC is
405
        disabled.
406
        """
407 1
        circuit_id = request.path_params["circuit_id"]
408 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
409 1
        try:
410 1
            evc = self.circuits[circuit_id]
411 1
        except KeyError:
412 1
            result = f"circuit_id {circuit_id} not found"
413 1
            log.debug("delete_circuit result %s %s", result, 404)
414 1
            raise HTTPException(404, detail=result) from KeyError
415
416 1
        if evc.archived:
417 1
            result = f"Circuit {circuit_id} already removed"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419 1
            raise HTTPException(404, detail=result)
420
421 1
        log.info("Removing %s", evc)
422 1
        with evc.lock:
423 1
            evc.remove_current_flows()
424 1
            evc.remove_failover_flows(sync=False)
425 1
            evc.deactivate()
426 1
            evc.disable()
427 1
            self.sched.remove(evc)
428 1
            evc.archive()
429 1
            evc.remove_uni_tags()
430 1
            evc.sync()
431 1
        log.info("EVC removed. %s", evc)
432 1
        result = {"response": f"Circuit {circuit_id} removed"}
433 1
        status = 200
434
435 1
        log.debug("delete_circuit result %s %s", result, status)
436 1
        emit_event(self.controller, "deleted",
437
                   content=map_evc_event_content(evc))
438 1
        return JSONResponse(result, status_code=status)
439
440 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
441 1
    def get_metadata(self, request: Request) -> JSONResponse:
442
        """Get metadata from an EVC."""
443 1
        circuit_id = request.path_params["circuit_id"]
444 1
        try:
445 1
            return (
446
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
447
            )
448
        except KeyError as error:
449
            raise HTTPException(
450
                404,
451
                detail=f"circuit_id {circuit_id} not found."
452
            ) from error
453
454 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
455 1
    @validate_openapi(spec)
456 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
457
        """Add metadata to a bulk of EVCs."""
458 1
        data = get_json_or_400(request, self.controller.loop)
459 1
        circuit_ids = data.pop("circuit_ids")
460
461 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
462
463 1
        fail_evcs = []
464 1
        for _id in circuit_ids:
465 1
            try:
466 1
                evc = self.circuits[_id]
467 1
                evc.extend_metadata(data)
468 1
            except KeyError:
469 1
                fail_evcs.append(_id)
470
471 1
        if fail_evcs:
472 1
            raise HTTPException(404, detail=fail_evcs)
473 1
        return JSONResponse("Operation successful", status_code=201)
474
475 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
476 1
    @validate_openapi(spec)
477 1
    def add_metadata(self, request: Request) -> JSONResponse:
478
        """Add metadata to an EVC."""
479 1
        circuit_id = request.path_params["circuit_id"]
480 1
        metadata = get_json_or_400(request, self.controller.loop)
481 1
        if not isinstance(metadata, dict):
482
            raise HTTPException(400, "Invalid metadata value: {metadata}")
483 1
        try:
484 1
            evc = self.circuits[circuit_id]
485 1
        except KeyError as error:
486 1
            raise HTTPException(
487
                404,
488
                detail=f"circuit_id {circuit_id} not found."
489
            ) from error
490
491 1
        evc.extend_metadata(metadata)
492 1
        evc.sync()
493 1
        return JSONResponse("Operation successful", status_code=201)
494
495 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
496 1
    @validate_openapi(spec)
497 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
498
        """Delete metada from a bulk of EVCs"""
499 1
        data = get_json_or_400(request, self.controller.loop)
500 1
        key = request.path_params["key"]
501 1
        circuit_ids = data.pop("circuit_ids")
502 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
503
504 1
        fail_evcs = []
505 1
        for _id in circuit_ids:
506 1
            try:
507 1
                evc = self.circuits[_id]
508 1
                evc.remove_metadata(key)
509
            except KeyError:
510
                fail_evcs.append(_id)
511
512 1
        if fail_evcs:
513
            raise HTTPException(404, detail=fail_evcs)
514 1
        return JSONResponse("Operation successful")
515
516 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
517 1
    def delete_metadata(self, request: Request) -> JSONResponse:
518
        """Delete metadata from an EVC."""
519 1
        circuit_id = request.path_params["circuit_id"]
520 1
        key = request.path_params["key"]
521 1
        try:
522 1
            evc = self.circuits[circuit_id]
523 1
        except KeyError as error:
524 1
            raise HTTPException(
525
                404,
526
                detail=f"circuit_id {circuit_id} not found."
527
            ) from error
528
529 1
        evc.remove_metadata(key)
530 1
        evc.sync()
531 1
        return JSONResponse("Operation successful")
532
533 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
534 1
    def redeploy(self, request: Request) -> JSONResponse:
535
        """Endpoint to force the redeployment of an EVC."""
536 1
        circuit_id = request.path_params["circuit_id"]
537 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
538 1
        try:
539 1
            evc = self.circuits[circuit_id]
540 1
        except KeyError:
541 1
            raise HTTPException(
542
                404,
543
                detail=f"circuit_id {circuit_id} not found"
544
            ) from KeyError
545 1
        if evc.is_enabled():
546 1
            with evc.lock:
547 1
                evc.remove_current_flows()
548 1
                evc.deploy()
549 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
550 1
            status = 202
551
        else:
552 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
553 1
            status = 409
554
555 1
        return JSONResponse(result, status_code=status)
556
557 1
    @rest("/v2/evc/schedule/", methods=["POST"])
558 1
    @validate_openapi(spec)
559 1
    def create_schedule(self, request: Request) -> JSONResponse:
560
        """
561
        Create a new schedule for a given circuit.
562
563
        This service do no check if there are conflicts with another schedule.
564
        Payload example:
565
            {
566
              "circuit_id":"aa:bb:cc",
567
              "schedule": {
568
                "date": "2019-08-07T14:52:10.967Z",
569
                "interval": "string",
570
                "frequency": "1 * * * *",
571
                "action": "create"
572
              }
573
            }
574
        """
575 1
        log.debug("create_schedule /v2/evc/schedule/")
576 1
        data = get_json_or_400(request, self.controller.loop)
577 1
        circuit_id = data["circuit_id"]
578 1
        schedule_data = data["schedule"]
579
580
        # Get EVC from circuits buffer
581 1
        circuits = self._get_circuits_buffer()
582
583
        # get the circuit
584 1
        evc = circuits.get(circuit_id)
585
586
        # get the circuit
587 1
        if not evc:
588 1
            result = f"circuit_id {circuit_id} not found"
589 1
            log.debug("create_schedule result %s %s", result, 404)
590 1
            raise HTTPException(404, detail=result)
591
        # Can not modify circuits deleted and archived
592 1
        if evc.archived:
593 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
594 1
            log.debug("create_schedule result %s %s", result, 409)
595 1
            raise HTTPException(409, detail=result)
596
597
        # new schedule from dict
598 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
599
600
        # If there is no schedule, create the list
601 1
        if not evc.circuit_scheduler:
602 1
            evc.circuit_scheduler = []
603
604
        # Add the new schedule
605 1
        evc.circuit_scheduler.append(new_schedule)
606
607
        # Add schedule job
608 1
        self.sched.add_circuit_job(evc, new_schedule)
609
610
        # save circuit to mongodb
611 1
        evc.sync()
612
613 1
        result = new_schedule.as_dict()
614 1
        status = 201
615
616 1
        log.debug("create_schedule result %s %s", result, status)
617 1
        return JSONResponse(result, status_code=status)
618
619 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
620 1
    @validate_openapi(spec)
621 1
    def update_schedule(self, request: Request) -> JSONResponse:
622
        """Update a schedule.
623
624
        Change all attributes from the given schedule from a EVC circuit.
625
        The schedule ID is preserved as default.
626
        Payload example:
627
            {
628
              "date": "2019-08-07T14:52:10.967Z",
629
              "interval": "string",
630
              "frequency": "1 * * *",
631
              "action": "create"
632
            }
633
        """
634 1
        data = get_json_or_400(request, self.controller.loop)
635 1
        schedule_id = request.path_params["schedule_id"]
636 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
637
638
        # Try to find a circuit schedule
639 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
640
641
        # Can not modify circuits deleted and archived
642 1
        if not found_schedule:
643 1
            result = f"schedule_id {schedule_id} not found"
644 1
            log.debug("update_schedule result %s %s", result, 404)
645 1
            raise HTTPException(404, detail=result)
646 1
        if evc.archived:
647 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
648 1
            log.debug("update_schedule result %s %s", result, 409)
649 1
            raise HTTPException(409, detail=result)
650
651 1
        new_schedule = CircuitSchedule.from_dict(data)
652 1
        new_schedule.id = found_schedule.id
653
        # Remove the old schedule
654 1
        evc.circuit_scheduler.remove(found_schedule)
655
        # Append the modified schedule
656 1
        evc.circuit_scheduler.append(new_schedule)
657
658
        # Cancel all schedule jobs
659 1
        self.sched.cancel_job(found_schedule.id)
660
        # Add the new circuit schedule
661 1
        self.sched.add_circuit_job(evc, new_schedule)
662
        # Save EVC to mongodb
663 1
        evc.sync()
664
665 1
        result = new_schedule.as_dict()
666 1
        status = 200
667
668 1
        log.debug("update_schedule result %s %s", result, status)
669 1
        return JSONResponse(result, status_code=status)
670
671 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
672 1
    def delete_schedule(self, request: Request) -> JSONResponse:
673
        """Remove a circuit schedule.
674
675
        Remove the Schedule from EVC.
676
        Remove the Schedule from cron job.
677
        Save the EVC to the Storehouse.
678
        """
679 1
        schedule_id = request.path_params["schedule_id"]
680 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
681 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
682
683
        # Can not modify circuits deleted and archived
684 1
        if not found_schedule:
685 1
            result = f"schedule_id {schedule_id} not found"
686 1
            log.debug("delete_schedule result %s %s", result, 404)
687 1
            raise HTTPException(404, detail=result)
688
689 1
        if evc.archived:
690 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
691 1
            log.debug("delete_schedule result %s %s", result, 409)
692 1
            raise HTTPException(409, detail=result)
693
694
        # Remove the old schedule
695 1
        evc.circuit_scheduler.remove(found_schedule)
696
697
        # Cancel all schedule jobs
698 1
        self.sched.cancel_job(found_schedule.id)
699
        # Save EVC to mongodb
700 1
        evc.sync()
701
702 1
        result = "Schedule removed"
703 1
        status = 200
704
705 1
        log.debug("delete_schedule result %s %s", result, status)
706 1
        return JSONResponse(result, status_code=status)
707
708 1
    def _is_duplicated_evc(self, evc):
709
        """Verify if the circuit given is duplicated with the stored evcs.
710
711
        Args:
712
            evc (EVC): circuit to be analysed.
713
714
        Returns:
715
            boolean: True if the circuit is duplicated, otherwise False.
716
717
        """
718 1
        for circuit in tuple(self.circuits.values()):
719 1
            if not circuit.archived and circuit.shares_uni(evc):
720 1
                return True
721 1
        return False
722
723 1
    @listen_to("kytos/topology.link_up")
724 1
    def on_link_up(self, event):
725
        """Change circuit when link is up or end_maintenance."""
726
        self.handle_link_up(event)
727
728 1
    def handle_link_up(self, event):
729
        """Change circuit when link is up or end_maintenance."""
730 1
        log.info("Event handle_link_up %s", event.content["link"])
731 1
        for evc in self.get_evcs_by_svc_level():
732 1
            if evc.is_enabled() and not evc.archived:
733 1
                with evc.lock:
734 1
                    evc.handle_link_up(event.content["link"])
735
736
    # Possibly replace this with interruptions?
737 1
    @listen_to(
738
        '.*.switch.interface.(link_up|link_down|created|deleted)',
739
        pool='dynamic_single'
740
    )
741 1
    def on_interface_link_change(self, event: KytosEvent):
742
        """
743
        Handler for interface link_up and link_down events
744
        """
745
        with self._lock:
746
            _, _, event_type = event.name.rpartition('.')
747
            iface = event.content.get("interface")
748
            if event_type in ('link_up', 'created'):
749
                self.handle_interface_link_up(iface)
750
            elif event_type in ('link_down', 'deleted'):
751
                self.handle_interface_link_down(iface)
752
753 1
    def handle_interface_link_up(self, interface):
754
        """
755
        Handler for interface link_up events
756
        """
757
        for evc in self.get_evcs_by_svc_level():
758
            log.info("Event handle_interface_link_up %s", interface)
759
            evc.handle_interface_link_up(
760
                interface
761
            )
762
763 1
    def handle_interface_link_down(self, interface):
764
        """
765
        Handler for interface link_down events
766
        """
767
        for evc in self.get_evcs_by_svc_level():
768
            log.info("Event handle_interface_link_down %s", interface)
769
            evc.handle_interface_link_down(
770
                interface
771
            )
772
773 1
    @listen_to("kytos/topology.link_down")
774 1
    def on_link_down(self, event):
775
        """Change circuit when link is down or under_mantenance."""
776
        self.handle_link_down(event)
777
778 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
779
        """Change circuit when link is down or under_mantenance."""
780 1
        link = event.content["link"]
781 1
        log.info("Event handle_link_down %s", link)
782 1
        switch_flows = {}
783 1
        evcs_with_failover = []
784 1
        evcs_normal = []
785 1
        check_failover = []
786 1
        for evc in self.get_evcs_by_svc_level():
787 1
            if evc.is_affected_by_link(link):
788
                # if there is no failover path, handles link down the
789
                # tradditional way
790 1
                if (
791
                    not getattr(evc, 'failover_path', None) or
792
                    evc.is_failover_path_affected_by_link(link)
793
                ):
794 1
                    evcs_normal.append(evc)
795 1
                    continue
796 1
                try:
797 1
                    dpid_flows = evc.get_failover_flows()
798
                # pylint: disable=broad-except
799 1
                except Exception:
800 1
                    err = traceback.format_exc().replace("\n", ", ")
801 1
                    log.error(
802
                        f"Ignore Failover path for {evc} due to error: {err}"
803
                    )
804 1
                    evcs_normal.append(evc)
805 1
                    continue
806 1
                for dpid, flows in dpid_flows.items():
807 1
                    switch_flows.setdefault(dpid, [])
808 1
                    switch_flows[dpid].extend(flows)
809 1
                evcs_with_failover.append(evc)
810
            else:
811 1
                check_failover.append(evc)
812
813 1
        while switch_flows:
814 1
            offset = settings.BATCH_SIZE or None
815 1
            switches = list(switch_flows.keys())
816 1
            for dpid in switches:
817 1
                emit_event(
818
                    self.controller,
819
                    context="kytos.flow_manager",
820
                    name="flows.install",
821
                    content={
822
                        "dpid": dpid,
823
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
824
                    }
825
                )
826 1
                if offset is None or offset >= len(switch_flows[dpid]):
827 1
                    del switch_flows[dpid]
828 1
                    continue
829 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
830 1
            time.sleep(settings.BATCH_INTERVAL)
831
832 1
        for evc in evcs_with_failover:
833 1
            with evc.lock:
834 1
                old_path = evc.current_path
835 1
                evc.current_path = evc.failover_path
836 1
                evc.failover_path = old_path
837 1
                evc.sync()
838 1
            emit_event(self.controller, "redeployed_link_down",
839
                       content=map_evc_event_content(evc))
840 1
            log.info(
841
                f"{evc} redeployed with failover due to link down {link.id}"
842
            )
843
844 1
        for evc in evcs_normal:
845 1
            emit_event(
846
                self.controller,
847
                "evc_affected_by_link_down",
848
                content={"link_id": link.id} | map_evc_event_content(evc)
849
            )
850
851
        # After handling the hot path, check if new failover paths are needed.
852
        # Note that EVCs affected by link down will generate a KytosEvent for
853
        # deployed|redeployed, which will trigger the failover path setup.
854
        # Thus, we just need to further check the check_failover list
855 1
        for evc in check_failover:
856 1
            if evc.is_failover_path_affected_by_link(link):
857 1
                with evc.lock:
858 1
                    evc.setup_failover_path()
859
860 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
861 1
    def on_evc_affected_by_link_down(self, event):
862
        """Change circuit when link is down or under_mantenance."""
863
        self.handle_evc_affected_by_link_down(event)
864
865 1
    def handle_evc_affected_by_link_down(self, event):
866
        """Change circuit when link is down or under_mantenance."""
867 1
        evc = self.circuits.get(event.content["evc_id"])
868 1
        link_id = event.content['link_id']
869 1
        if not evc:
870 1
            return
871 1
        with evc.lock:
872 1
            result = evc.handle_link_down()
873 1
        event_name = "error_redeploy_link_down"
874 1
        if result:
875 1
            log.info(f"{evc} redeployed due to link down {link_id}")
876 1
            event_name = "redeployed_link_down"
877 1
        emit_event(self.controller, event_name,
878
                   content=map_evc_event_content(evc))
879
880 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
881 1
    def on_evc_deployed(self, event):
882
        """Handle EVC deployed|redeployed_link_down."""
883
        self.handle_evc_deployed(event)
884
885 1
    def handle_evc_deployed(self, event):
886
        """Setup failover path on evc deployed."""
887 1
        evc = self.circuits.get(event.content["evc_id"])
888 1
        if not evc:
889 1
            return
890 1
        with evc.lock:
891 1
            evc.setup_failover_path()
892
893 1
    @listen_to("kytos/topology.topology_loaded")
894 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
895
        """Load EVCs once the topology is available."""
896
        self.load_all_evcs()
897
898 1
    def load_all_evcs(self):
899
        """Try to load all EVCs on startup."""
900 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
901 1
        for circuit_id, circuit in circuits:
902 1
            if circuit_id not in self.circuits:
903 1
                self._load_evc(circuit)
904
905 1
    def _load_evc(self, circuit_dict):
906
        """Load one EVC from mongodb to memory."""
907 1
        try:
908 1
            evc = self._evc_from_dict(circuit_dict)
909 1
        except ValueError as exception:
910 1
            log.error(
911
                f"Could not load EVC: dict={circuit_dict} error={exception}"
912
            )
913 1
            return None
914
915 1
        if evc.archived:
916 1
            return None
917
918 1
        self.circuits.setdefault(evc.id, evc)
919 1
        self.sched.add(evc)
920 1
        return evc
921
922 1
    @listen_to("kytos/flow_manager.flow.error")
923 1
    def on_flow_mod_error(self, event):
924
        """Handle flow mod errors related to an EVC."""
925
        self.handle_flow_mod_error(event)
926
927 1
    def handle_flow_mod_error(self, event):
928
        """Handle flow mod errors related to an EVC."""
929 1
        flow = event.content["flow"]
930 1
        command = event.content.get("error_command")
931 1
        if command != "add":
932
            return
933 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
934 1
        if evc:
935 1
            with evc.lock:
936 1
                evc.remove_current_flows()
937
938 1
    def _evc_dict_with_instances(self, evc_dict):
939
        """Convert some dict values to instance of EVC classes.
940
941
        This method will convert: [UNI, Link]
942
        """
943 1
        data = evc_dict.copy()  # Do not modify the original dict
944 1
        for attribute, value in data.items():
945
            # Get multiple attributes.
946
            # Ex: uni_a, uni_z
947 1
            if "uni" in attribute:
948 1
                try:
949 1
                    data[attribute] = self._uni_from_dict(value)
950 1
                except ValueError as exception:
951 1
                    result = "Error creating UNI: Invalid value"
952 1
                    raise ValueError(result) from exception
953
954 1
            if attribute == "circuit_scheduler":
955 1
                data[attribute] = []
956 1
                for schedule in value:
957 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
958
959
            # Get multiple attributes.
960
            # Ex: primary_links,
961
            #     backup_links,
962
            #     current_links_cache,
963
            #     primary_links_cache,
964
            #     backup_links_cache
965 1
            if "links" in attribute:
966 1
                data[attribute] = [
967
                    self._link_from_dict(link) for link in value
968
                ]
969
970
            # Ex: current_path,
971
            #     primary_path,
972
            #     backup_path
973 1
            if "path" in attribute and attribute != "dynamic_backup_path":
974 1
                data[attribute] = Path(
975
                    [self._link_from_dict(link) for link in value]
976
                )
977
978 1
        return data
979
980 1
    def _evc_from_dict(self, evc_dict):
981 1
        data = self._evc_dict_with_instances(evc_dict)
982 1
        data["table_group"] = self.table_group
983 1
        return EVC(self.controller, **data)
984
985 1
    def _uni_from_dict(self, uni_dict):
986
        """Return a UNI object from python dict."""
987 1
        if uni_dict is None:
988 1
            return False
989
990 1
        interface_id = uni_dict.get("interface_id")
991 1
        interface = self.controller.get_interface_by_id(interface_id)
992 1
        if interface is None:
993 1
            result = (
994
                "Error creating UNI:"
995
                + f"Could not instantiate interface {interface_id}"
996
            )
997 1
            raise ValueError(result) from ValueError
998 1
        tag_convert = {1: "vlan"}
999 1
        tag_dict = uni_dict.get("tag", None)
1000 1
        if tag_dict:
1001 1
            tag_type = tag_dict.get("tag_type")
1002 1
            tag_type = tag_convert.get(tag_type, tag_type)
1003 1
            tag_value = tag_dict.get("value")
1004 1
            tag = TAG(tag_type, tag_value)
1005
        else:
1006 1
            tag = None
1007 1
        uni = UNI(interface, tag)
1008
1009 1
        return uni
1010
1011 1
    def _link_from_dict(self, link_dict):
1012
        """Return a Link object from python dict."""
1013 1
        id_a = link_dict.get("endpoint_a").get("id")
1014 1
        id_b = link_dict.get("endpoint_b").get("id")
1015
1016 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1017 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1018 1
        if not endpoint_a:
1019 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1020 1
            raise ValueError(error_msg)
1021 1
        if not endpoint_b:
1022
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1023
            raise ValueError(error_msg)
1024
1025 1
        link = Link(endpoint_a, endpoint_b)
1026 1
        if "metadata" in link_dict:
1027 1
            link.extend_metadata(link_dict.get("metadata"))
1028
1029 1
        s_vlan = link.get_metadata("s_vlan")
1030 1
        if s_vlan:
1031 1
            tag = TAG.from_dict(s_vlan)
1032 1
            if tag is False:
1033
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1034
                raise ValueError(error_msg)
1035 1
            link.update_metadata("s_vlan", tag)
1036 1
        return link
1037
1038 1
    def _find_evc_by_schedule_id(self, schedule_id):
1039
        """
1040
        Find an EVC and CircuitSchedule based on schedule_id.
1041
1042
        :param schedule_id: Schedule ID
1043
        :return: EVC and Schedule
1044
        """
1045 1
        circuits = self._get_circuits_buffer()
1046 1
        found_schedule = None
1047 1
        evc = None
1048
1049
        # pylint: disable=unused-variable
1050 1
        for c_id, circuit in circuits.items():
1051 1
            for schedule in circuit.circuit_scheduler:
1052 1
                if schedule.id == schedule_id:
1053 1
                    found_schedule = schedule
1054 1
                    evc = circuit
1055 1
                    break
1056 1
            if found_schedule:
1057 1
                break
1058 1
        return evc, found_schedule
1059
1060 1
    def _get_circuits_buffer(self):
1061
        """
1062
        Return the circuit buffer.
1063
1064
        If the buffer is empty, try to load data from mongodb.
1065
        """
1066 1
        if not self.circuits:
1067
            # Load circuits from mongodb to buffer
1068 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1069 1
            for c_id, circuit in circuits.items():
1070 1
                evc = self._evc_from_dict(circuit)
1071 1
                self.circuits[c_id] = evc
1072 1
        return self.circuits
1073
1074
    # pylint: disable=attribute-defined-outside-init
1075 1
    @alisten_to("kytos/of_multi_table.enable_table")
1076 1
    async def on_table_enabled(self, event):
1077
        """Handle a recently table enabled."""
1078 1
        table_group = event.content.get("mef_eline", None)
1079 1
        if not table_group:
1080
            return
1081 1
        for group in table_group:
1082 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1083 1
                log.error(f'The table group "{group}" is not allowed for '
1084
                          f'mef_eline. Allowed table groups are '
1085
                          f'{settings.TABLE_GROUP_ALLOWED}')
1086 1
                return
1087 1
        self.table_group.update(table_group)
1088 1
        content = {"group_table": self.table_group}
1089 1
        name = "kytos/mef_eline.enable_table"
1090
        await aemit_event(self.controller, name, content)
1091