Passed
Pull Request — master (#375)
by Italo Valcy
04:00
created

build.main.Main.on_link_up()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
crap 1.037
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
25
                                         emit_event, map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64 1
        self._topology_updated_at = None
65
66 1
    def get_evcs_by_svc_level(self) -> list:
67
        """Get circuits sorted by desc service level and asc creation_time.
68
69
        In the future, as more ops are offloaded it should be get from the DB.
70
        """
71 1
        return sorted(self.circuits.values(),
72
                      key=lambda x: (-x.service_level, x.creation_time))
73
74 1
    @staticmethod
75 1
    def get_eline_controller():
76
        """Return the ELineController instance."""
77
        return controllers.ELineController()
78
79 1
    def execute(self):
80
        """Execute once when the napp is running."""
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def should_be_checked(self, circuit):
89
        "Verify if the circuit meets the necessary conditions to be checked"
90
        # pylint: disable=too-many-boolean-expressions
91 1
        if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
                and circuit.are_unis_active(self.controller.switches)
98
                # if a inter-switch EVC does not have current_path, it does not
99
                # make sense to run sdntrace on it
100
                and (circuit.is_intra_switch() or circuit.current_path)
101
                ):
102 1
            return True
103
        return False
104
105 1
    def execute_consistency(self):
106
        """Execute consistency routine."""
107 1
        circuits_to_check = []
108 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
109 1
        for circuit in self.get_evcs_by_svc_level():
110 1
            stored_circuits.pop(circuit.id, None)
111 1
            if self.should_be_checked(circuit):
112 1
                circuits_to_check.append(circuit)
113 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
114 1
        for circuit in circuits_to_check:
115 1
            is_checked = circuits_checked.get(circuit.id)
116 1
            if is_checked:
117 1
                circuit.execution_rounds = 0
118 1
                log.info(f"{circuit} enabled but inactive - activating")
119 1
                with circuit.lock:
120 1
                    circuit.activate()
121 1
                    circuit.sync()
122
            else:
123 1
                circuit.execution_rounds += 1
124 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
125 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
126 1
                    with circuit.lock:
127 1
                        circuit.deploy()
128 1
        for circuit_id in stored_circuits:
129 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
130 1
            self._load_evc(stored_circuits[circuit_id])
131
132 1
    def shutdown(self):
133
        """Execute when your napp is unloaded.
134
135
        If you have some cleanup procedure, insert it here.
136
        """
137
138 1
    @rest("/v2/evc/", methods=["GET"])
139 1
    def list_circuits(self, request: Request) -> JSONResponse:
140
        """Endpoint to return circuits stored.
141
142
        archive query arg if defined (not null) will be filtered
143
        accordingly, by default only non archived evcs will be listed
144
        """
145 1
        log.debug("list_circuits /v2/evc")
146 1
        args = request.query_params
147 1
        archived = args.get("archived", "false").lower()
148 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
149 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
150
                                                      metadata=args)
151 1
        circuits = circuits['circuits']
152 1
        return JSONResponse(circuits)
153
154 1
    @rest("/v2/evc/schedule", methods=["GET"])
155 1
    def list_schedules(self, _request: Request) -> JSONResponse:
156
        """Endpoint to return all schedules stored for all circuits.
157
158
        Return a JSON with the following template:
159
        [{"schedule_id": <schedule_id>,
160
         "circuit_id": <circuit_id>,
161
         "schedule": <schedule object>}]
162
        """
163 1
        log.debug("list_schedules /v2/evc/schedule")
164 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
165 1
        if not circuits:
166 1
            result = {}
167 1
            status = 200
168 1
            return JSONResponse(result, status_code=status)
169
170 1
        result = []
171 1
        status = 200
172 1
        for circuit in circuits:
173 1
            circuit_scheduler = circuit.get("circuit_scheduler")
174 1
            if circuit_scheduler:
175 1
                for scheduler in circuit_scheduler:
176 1
                    value = {
177
                        "schedule_id": scheduler.get("id"),
178
                        "circuit_id": circuit.get("id"),
179
                        "schedule": scheduler,
180
                    }
181 1
                    result.append(value)
182
183 1
        log.debug("list_schedules result %s %s", result, status)
184 1
        return JSONResponse(result, status_code=status)
185
186 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
187 1
    def get_circuit(self, request: Request) -> JSONResponse:
188
        """Endpoint to return a circuit based on id."""
189 1
        circuit_id = request.path_params["circuit_id"]
190 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
191 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
192 1
        if not circuit:
193 1
            result = f"circuit_id {circuit_id} not found"
194 1
            log.debug("get_circuit result %s %s", result, 404)
195 1
            raise HTTPException(404, detail=result)
196 1
        status = 200
197 1
        log.debug("get_circuit result %s %s", circuit, status)
198 1
        return JSONResponse(circuit, status_code=status)
199
200 1
    @rest("/v2/evc/", methods=["POST"])
201 1
    @validate_openapi(spec)
202 1
    def create_circuit(self, request: Request) -> JSONResponse:
203
        """Try to create a new circuit.
204
205
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
206
        UNI_Z's requested C-VID are available from the interfaces' pools. This
207
        is checked when creating the UNI object.
208
209
        Then, E-Line NApp requests a primary and a backup path to the
210
        Pathfinder NApp using the attributes primary_links and backup_links
211
        submitted via REST
212
213
        # For each link composing paths in #3:
214
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
215
        #  - Using the S-VID obtained, generate abstract flow entries to be
216
        #    sent to FlowManager
217
218
        Push abstract flow entries to FlowManager and FlowManager pushes
219
        OpenFlow entries to datapaths
220
221
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
222
        creation
223
224
        Finnaly, notify user of the status of its request.
225
        """
226
        # Try to create the circuit object
227 1
        log.debug("create_circuit /v2/evc/")
228 1
        data = get_json_or_400(request, self.controller.loop)
229
230 1
        try:
231 1
            evc = self._evc_from_dict(data)
232 1
        except ValueError as exception:
233 1
            log.debug("create_circuit result %s %s", exception, 400)
234 1
            raise HTTPException(400, detail=str(exception)) from exception
235
236 1
        try:
237 1
            check_disabled_component(evc.uni_a, evc.uni_z)
238 1
        except DisabledSwitch as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 409)
240 1
            raise HTTPException(
241
                    409,
242
                    detail=f"Path is not valid: {exception}"
243
                ) from exception
244
245 1
        if evc.primary_path:
246
            try:
247
                evc.primary_path.is_valid(
248
                    evc.uni_a.interface.switch,
249
                    evc.uni_z.interface.switch,
250
                    bool(evc.circuit_scheduler),
251
                )
252
            except InvalidPath as exception:
253
                raise HTTPException(
254
                    400,
255
                    detail=f"primary_path is not valid: {exception}"
256
                ) from exception
257 1
        if evc.backup_path:
258
            try:
259
                evc.backup_path.is_valid(
260
                    evc.uni_a.interface.switch,
261
                    evc.uni_z.interface.switch,
262
                    bool(evc.circuit_scheduler),
263
                )
264
            except InvalidPath as exception:
265
                raise HTTPException(
266
                    400,
267
                    detail=f"backup_path is not valid: {exception}"
268
                ) from exception
269
270
        # verify duplicated evc
271 1
        if self._is_duplicated_evc(evc):
272 1
            result = "The EVC already exists."
273 1
            log.debug("create_circuit result %s %s", result, 409)
274 1
            raise HTTPException(409, detail=result)
275
276 1
        try:
277 1
            evc._validate_has_primary_or_dynamic()
278 1
        except ValueError as exception:
279 1
            raise HTTPException(400, detail=str(exception)) from exception
280
281
        # save circuit
282 1
        try:
283 1
            evc.sync()
284
        except ValidationError as exception:
285
            raise HTTPException(400, detail=str(exception)) from exception
286
287
        # store circuit in dictionary
288 1
        self.circuits[evc.id] = evc
289
290
        # Schedule the circuit deploy
291 1
        self.sched.add(evc)
292
293
        # Circuit has no schedule, deploy now
294 1
        if not evc.circuit_scheduler:
295 1
            with evc.lock:
296 1
                evc.deploy()
297
298
        # Notify users
299 1
        result = {"circuit_id": evc.id}
300 1
        status = 201
301 1
        log.debug("create_circuit result %s %s", result, status)
302 1
        emit_event(self.controller, name="created",
303
                   content=map_evc_event_content(evc))
304 1
        return JSONResponse(result, status_code=status)
305
306 1
    @listen_to('kytos/flow_manager.flow.removed')
307 1
    def on_flow_delete(self, event):
308
        """Capture delete messages to keep track when flows got removed."""
309
        self.handle_flow_delete(event)
310
311 1
    def handle_flow_delete(self, event):
312
        """Keep track when the EVC got flows removed by deriving its cookie."""
313 1
        flow = event.content["flow"]
314 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
315 1
        if evc:
316 1
            log.debug("Flow removed in EVC %s", evc.id)
317 1
            evc.set_flow_removed_at()
318
319 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
320 1
    @validate_openapi(spec)
321 1
    def update(self, request: Request) -> JSONResponse:
322
        """Update a circuit based on payload.
323
324
        The EVC attributes (creation_time, active, current_path,
325
        failover_path, _id, archived) can't be updated.
326
        """
327 1
        data = get_json_or_400(request, self.controller.loop)
328 1
        circuit_id = request.path_params["circuit_id"]
329 1
        log.debug("update /v2/evc/%s", circuit_id)
330 1
        try:
331 1
            evc = self.circuits[circuit_id]
332 1
        except KeyError:
333 1
            result = f"circuit_id {circuit_id} not found"
334 1
            log.debug("update result %s %s", result, 404)
335 1
            raise HTTPException(404, detail=result) from KeyError
336
337 1
        if evc.archived:
338 1
            result = "Can't update archived EVC"
339 1
            log.debug("update result %s %s", result, 409)
340 1
            raise HTTPException(409, detail=result)
341
342 1
        try:
343 1
            enable, redeploy = evc.update(
344
                **self._evc_dict_with_instances(data)
345
            )
346 1
        except ValidationError as exception:
347
            raise HTTPException(400, detail=str(exception)) from exception
348 1
        except ValueError as exception:
349 1
            log.error(exception)
350 1
            log.debug("update result %s %s", exception, 400)
351 1
            raise HTTPException(400, detail=str(exception)) from exception
352 1
        except DisabledSwitch as exception:
353 1
            log.debug("update result %s %s", exception, 409)
354 1
            raise HTTPException(
355
                    409,
356
                    detail=f"Path is not valid: {exception}"
357
                ) from exception
358
359 1
        if evc.is_active():
360
            if enable is False:  # disable if active
361
                with evc.lock:
362
                    evc.remove()
363
            elif redeploy is not None:  # redeploy if active
364
                with evc.lock:
365
                    evc.remove()
366
                    evc.deploy()
367
        else:
368 1
            if enable is True:  # enable if inactive
369 1
                with evc.lock:
370 1
                    evc.deploy()
371 1
        result = {evc.id: evc.as_dict()}
372 1
        status = 200
373
374 1
        log.debug("update result %s %s", result, status)
375 1
        emit_event(self.controller, "updated",
376
                   content=map_evc_event_content(evc, **data))
377 1
        return JSONResponse(result, status_code=status)
378
379 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
380 1
    def delete_circuit(self, request: Request) -> JSONResponse:
381
        """Remove a circuit.
382
383
        First, the flows are removed from the switches, and then the EVC is
384
        disabled.
385
        """
386 1
        circuit_id = request.path_params["circuit_id"]
387 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
388 1
        try:
389 1
            evc = self.circuits[circuit_id]
390 1
        except KeyError:
391 1
            result = f"circuit_id {circuit_id} not found"
392 1
            log.debug("delete_circuit result %s %s", result, 404)
393 1
            raise HTTPException(404, detail=result) from KeyError
394
395 1
        if evc.archived:
396 1
            result = f"Circuit {circuit_id} already removed"
397 1
            log.debug("delete_circuit result %s %s", result, 404)
398 1
            raise HTTPException(404, detail=result)
399
400 1
        log.info("Removing %s", evc)
401 1
        with evc.lock:
402 1
            evc.remove_current_flows()
403 1
            evc.remove_failover_flows(sync=False)
404 1
            evc.deactivate()
405 1
            evc.disable()
406 1
            self.sched.remove(evc)
407 1
            evc.archive()
408 1
            evc.sync()
409 1
        log.info("EVC removed. %s", evc)
410 1
        result = {"response": f"Circuit {circuit_id} removed"}
411 1
        status = 200
412
413 1
        log.debug("delete_circuit result %s %s", result, status)
414 1
        emit_event(self.controller, "deleted",
415
                   content=map_evc_event_content(evc))
416 1
        return JSONResponse(result, status_code=status)
417
418 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
419 1
    def get_metadata(self, request: Request) -> JSONResponse:
420
        """Get metadata from an EVC."""
421 1
        circuit_id = request.path_params["circuit_id"]
422 1
        try:
423 1
            return (
424
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
425
            )
426
        except KeyError as error:
427
            raise HTTPException(
428
                404,
429
                detail=f"circuit_id {circuit_id} not found."
430
            ) from error
431
432 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
433 1
    @validate_openapi(spec)
434 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
435
        """Add metadata to a bulk of EVCs."""
436 1
        data = get_json_or_400(request, self.controller.loop)
437 1
        circuit_ids = data.pop("circuit_ids")
438
439 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
440
441 1
        fail_evcs = []
442 1
        for _id in circuit_ids:
443 1
            try:
444 1
                evc = self.circuits[_id]
445 1
                evc.extend_metadata(data)
446 1
            except KeyError:
447 1
                fail_evcs.append(_id)
448
449 1
        if fail_evcs:
450 1
            raise HTTPException(404, detail=fail_evcs)
451 1
        return JSONResponse("Operation successful", status_code=201)
452
453 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
454 1
    @validate_openapi(spec)
455 1
    def add_metadata(self, request: Request) -> JSONResponse:
456
        """Add metadata to an EVC."""
457 1
        circuit_id = request.path_params["circuit_id"]
458 1
        metadata = get_json_or_400(request, self.controller.loop)
459 1
        if not isinstance(metadata, dict):
460
            raise HTTPException(400, "Invalid metadata value: {metadata}")
461 1
        try:
462 1
            evc = self.circuits[circuit_id]
463 1
        except KeyError as error:
464 1
            raise HTTPException(
465
                404,
466
                detail=f"circuit_id {circuit_id} not found."
467
            ) from error
468
469 1
        evc.extend_metadata(metadata)
470 1
        evc.sync()
471 1
        return JSONResponse("Operation successful", status_code=201)
472
473 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474 1
    @validate_openapi(spec)
475 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
476
        """Delete metada from a bulk of EVCs"""
477 1
        data = get_json_or_400(request, self.controller.loop)
478 1
        key = request.path_params["key"]
479 1
        circuit_ids = data.pop("circuit_ids")
480 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
481
482 1
        fail_evcs = []
483 1
        for _id in circuit_ids:
484 1
            try:
485 1
                evc = self.circuits[_id]
486 1
                evc.remove_metadata(key)
487
            except KeyError:
488
                fail_evcs.append(_id)
489
490 1
        if fail_evcs:
491
            raise HTTPException(404, detail=fail_evcs)
492 1
        return JSONResponse("Operation successful")
493
494 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
495 1
    def delete_metadata(self, request: Request) -> JSONResponse:
496
        """Delete metadata from an EVC."""
497 1
        circuit_id = request.path_params["circuit_id"]
498 1
        key = request.path_params["key"]
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.remove_metadata(key)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful")
510
511 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
512 1
    def redeploy(self, request: Request) -> JSONResponse:
513
        """Endpoint to force the redeployment of an EVC."""
514 1
        circuit_id = request.path_params["circuit_id"]
515 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
516 1
        try:
517 1
            evc = self.circuits[circuit_id]
518 1
        except KeyError:
519 1
            raise HTTPException(
520
                404,
521
                detail=f"circuit_id {circuit_id} not found"
522
            ) from KeyError
523 1
        if evc.is_enabled():
524 1
            with evc.lock:
525 1
                evc.remove_current_flows()
526 1
                evc.deploy()
527 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
528 1
            status = 202
529
        else:
530 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
531 1
            status = 409
532
533 1
        return JSONResponse(result, status_code=status)
534
535 1
    @rest("/v2/evc/schedule/", methods=["POST"])
536 1
    @validate_openapi(spec)
537 1
    def create_schedule(self, request: Request) -> JSONResponse:
538
        """
539
        Create a new schedule for a given circuit.
540
541
        This service do no check if there are conflicts with another schedule.
542
        Payload example:
543
            {
544
              "circuit_id":"aa:bb:cc",
545
              "schedule": {
546
                "date": "2019-08-07T14:52:10.967Z",
547
                "interval": "string",
548
                "frequency": "1 * * * *",
549
                "action": "create"
550
              }
551
            }
552
        """
553 1
        log.debug("create_schedule /v2/evc/schedule/")
554 1
        data = get_json_or_400(request, self.controller.loop)
555 1
        circuit_id = data["circuit_id"]
556 1
        schedule_data = data["schedule"]
557
558
        # Get EVC from circuits buffer
559 1
        circuits = self._get_circuits_buffer()
560
561
        # get the circuit
562 1
        evc = circuits.get(circuit_id)
563
564
        # get the circuit
565 1
        if not evc:
566 1
            result = f"circuit_id {circuit_id} not found"
567 1
            log.debug("create_schedule result %s %s", result, 404)
568 1
            raise HTTPException(404, detail=result)
569
        # Can not modify circuits deleted and archived
570 1
        if evc.archived:
571 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
572 1
            log.debug("create_schedule result %s %s", result, 409)
573 1
            raise HTTPException(409, detail=result)
574
575
        # new schedule from dict
576 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
577
578
        # If there is no schedule, create the list
579 1
        if not evc.circuit_scheduler:
580 1
            evc.circuit_scheduler = []
581
582
        # Add the new schedule
583 1
        evc.circuit_scheduler.append(new_schedule)
584
585
        # Add schedule job
586 1
        self.sched.add_circuit_job(evc, new_schedule)
587
588
        # save circuit to mongodb
589 1
        evc.sync()
590
591 1
        result = new_schedule.as_dict()
592 1
        status = 201
593
594 1
        log.debug("create_schedule result %s %s", result, status)
595 1
        return JSONResponse(result, status_code=status)
596
597 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
598 1
    @validate_openapi(spec)
599 1
    def update_schedule(self, request: Request) -> JSONResponse:
600
        """Update a schedule.
601
602
        Change all attributes from the given schedule from a EVC circuit.
603
        The schedule ID is preserved as default.
604
        Payload example:
605
            {
606
              "date": "2019-08-07T14:52:10.967Z",
607
              "interval": "string",
608
              "frequency": "1 * * *",
609
              "action": "create"
610
            }
611
        """
612 1
        data = get_json_or_400(request, self.controller.loop)
613 1
        schedule_id = request.path_params["schedule_id"]
614 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
615
616
        # Try to find a circuit schedule
617 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
618
619
        # Can not modify circuits deleted and archived
620 1
        if not found_schedule:
621 1
            result = f"schedule_id {schedule_id} not found"
622 1
            log.debug("update_schedule result %s %s", result, 404)
623 1
            raise HTTPException(404, detail=result)
624 1
        if evc.archived:
625 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
626 1
            log.debug("update_schedule result %s %s", result, 409)
627 1
            raise HTTPException(409, detail=result)
628
629 1
        new_schedule = CircuitSchedule.from_dict(data)
630 1
        new_schedule.id = found_schedule.id
631
        # Remove the old schedule
632 1
        evc.circuit_scheduler.remove(found_schedule)
633
        # Append the modified schedule
634 1
        evc.circuit_scheduler.append(new_schedule)
635
636
        # Cancel all schedule jobs
637 1
        self.sched.cancel_job(found_schedule.id)
638
        # Add the new circuit schedule
639 1
        self.sched.add_circuit_job(evc, new_schedule)
640
        # Save EVC to mongodb
641 1
        evc.sync()
642
643 1
        result = new_schedule.as_dict()
644 1
        status = 200
645
646 1
        log.debug("update_schedule result %s %s", result, status)
647 1
        return JSONResponse(result, status_code=status)
648
649 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
650 1
    def delete_schedule(self, request: Request) -> JSONResponse:
651
        """Remove a circuit schedule.
652
653
        Remove the Schedule from EVC.
654
        Remove the Schedule from cron job.
655
        Save the EVC to the Storehouse.
656
        """
657 1
        schedule_id = request.path_params["schedule_id"]
658 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
659 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
660
661
        # Can not modify circuits deleted and archived
662 1
        if not found_schedule:
663 1
            result = f"schedule_id {schedule_id} not found"
664 1
            log.debug("delete_schedule result %s %s", result, 404)
665 1
            raise HTTPException(404, detail=result)
666
667 1
        if evc.archived:
668 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
669 1
            log.debug("delete_schedule result %s %s", result, 409)
670 1
            raise HTTPException(409, detail=result)
671
672
        # Remove the old schedule
673 1
        evc.circuit_scheduler.remove(found_schedule)
674
675
        # Cancel all schedule jobs
676 1
        self.sched.cancel_job(found_schedule.id)
677
        # Save EVC to mongodb
678 1
        evc.sync()
679
680 1
        result = "Schedule removed"
681 1
        status = 200
682
683 1
        log.debug("delete_schedule result %s %s", result, status)
684 1
        return JSONResponse(result, status_code=status)
685
686 1
    def _is_duplicated_evc(self, evc):
687
        """Verify if the circuit given is duplicated with the stored evcs.
688
689
        Args:
690
            evc (EVC): circuit to be analysed.
691
692
        Returns:
693
            boolean: True if the circuit is duplicated, otherwise False.
694
695
        """
696 1
        for circuit in tuple(self.circuits.values()):
697 1
            if not circuit.archived and circuit.shares_uni(evc):
698 1
                return True
699 1
        return False
700
701 1
    @listen_to("kytos/topology.link_up")
702 1
    def on_link_up(self, event):
703
        """Change circuit when link is up or end_maintenance."""
704
        self.handle_link_up(event)
705
706 1
    def handle_link_up(self, event):
707
        """Change circuit when link is up or end_maintenance."""
708 1
        log.info("Event handle_link_up %s", event.content["link"])
709 1
        for evc in self.get_evcs_by_svc_level():
710 1
            if evc.is_enabled() and not evc.archived:
711 1
                with evc.lock:
712 1
                    evc.handle_link_up(event.content["link"])
713
714 1
    @listen_to("kytos/topology.updated")
715 1
    def on_topology_update(self, event):
716
        """Capture topology update event"""
717
        self.handle_topology_update(event)
718
719 1
    def handle_topology_update(self, event):
720
        """Handle topology update"""
721 1
        with self._lock:
722 1
            if (
723
                self._topology_updated_at
724
                and self._topology_updated_at > event.timestamp
725
            ):
726
                return
727 1
            self._topology_updated_at = event.timestamp
728 1
            for evc in self.get_evcs_by_svc_level():
729 1
                if evc.is_enabled() and not evc.archived:
730 1
                    with evc.lock:
731 1
                        evc.handle_topology_update(
732
                            event.content["topology"].switches
733
                        )
734
735 1
    @listen_to("kytos/topology.link_down")
736 1
    def on_link_down(self, event):
737
        """Change circuit when link is down or under_mantenance."""
738
        self.handle_link_down(event)
739
740 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
741
        """Change circuit when link is down or under_mantenance."""
742 1
        link = event.content["link"]
743 1
        log.info("Event handle_link_down %s", link)
744 1
        switch_flows = {}
745 1
        evcs_with_failover = []
746 1
        evcs_normal = []
747 1
        check_failover = []
748 1
        for evc in self.get_evcs_by_svc_level():
749 1
            if evc.is_affected_by_link(link):
750
                # if there is no failover path, handles link down the
751
                # tradditional way
752 1
                if (
753
                    not getattr(evc, 'failover_path', None) or
754
                    evc.is_failover_path_affected_by_link(link)
755
                ):
756 1
                    evcs_normal.append(evc)
757 1
                    continue
758 1
                try:
759 1
                    dpid_flows = evc.get_failover_flows()
760
                # pylint: disable=broad-except
761
                except Exception as err:
762
                    log.error(
763
                        f"Ignore Failover path for {evc} due to error: {err}"
764
                    )
765
                    evcs_normal.append(evc)
766
                    continue
767 1
                for dpid, flows in dpid_flows.items():
768 1
                    switch_flows.setdefault(dpid, [])
769 1
                    switch_flows[dpid].extend(flows)
770 1
                evcs_with_failover.append(evc)
771
            else:
772 1
                check_failover.append(evc)
773
774 1
        while switch_flows:
775 1
            offset = settings.BATCH_SIZE or None
776 1
            switches = list(switch_flows.keys())
777 1
            for dpid in switches:
778 1
                emit_event(
779
                    self.controller,
780
                    context="kytos.flow_manager",
781
                    name="flows.install",
782
                    content={
783
                        "dpid": dpid,
784
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
785
                    }
786
                )
787 1
                if offset is None or offset >= len(switch_flows[dpid]):
788 1
                    del switch_flows[dpid]
789 1
                    continue
790 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
791 1
            time.sleep(settings.BATCH_INTERVAL)
792
793 1
        for evc in evcs_with_failover:
794 1
            with evc.lock:
795 1
                old_path = evc.current_path
796 1
                evc.current_path = evc.failover_path
797 1
                evc.failover_path = old_path
798 1
                evc.sync()
799 1
            emit_event(self.controller, "redeployed_link_down",
800
                       content=map_evc_event_content(evc))
801 1
            log.info(
802
                f"{evc} redeployed with failover due to link down {link.id}"
803
            )
804
805 1
        for evc in evcs_normal:
806 1
            emit_event(
807
                self.controller,
808
                "evc_affected_by_link_down",
809
                content={"link_id": link.id} | map_evc_event_content(evc)
810
            )
811
812
        # After handling the hot path, check if new failover paths are needed.
813
        # Note that EVCs affected by link down will generate a KytosEvent for
814
        # deployed|redeployed, which will trigger the failover path setup.
815
        # Thus, we just need to further check the check_failover list
816 1
        for evc in check_failover:
817 1
            if evc.is_failover_path_affected_by_link(link):
818 1
                with evc.lock:
819 1
                    evc.setup_failover_path()
820
821 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
822 1
    def on_evc_affected_by_link_down(self, event):
823
        """Change circuit when link is down or under_mantenance."""
824
        self.handle_evc_affected_by_link_down(event)
825
826 1
    def handle_evc_affected_by_link_down(self, event):
827
        """Change circuit when link is down or under_mantenance."""
828 1
        evc = self.circuits.get(event.content["evc_id"])
829 1
        link_id = event.content['link_id']
830 1
        if not evc:
831 1
            return
832 1
        with evc.lock:
833 1
            result = evc.handle_link_down()
834 1
        event_name = "error_redeploy_link_down"
835 1
        if result:
836 1
            log.info(f"{evc} redeployed due to link down {link_id}")
837 1
            event_name = "redeployed_link_down"
838 1
        emit_event(self.controller, event_name,
839
                   content=map_evc_event_content(evc))
840
841 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
842 1
    def on_evc_deployed(self, event):
843
        """Handle EVC deployed|redeployed_link_down."""
844
        self.handle_evc_deployed(event)
845
846 1
    def handle_evc_deployed(self, event):
847
        """Setup failover path on evc deployed."""
848 1
        evc = self.circuits.get(event.content["evc_id"])
849 1
        if not evc:
850 1
            return
851 1
        with evc.lock:
852 1
            evc.setup_failover_path()
853
854 1
    @listen_to("kytos/topology.topology_loaded")
855 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
856
        """Load EVCs once the topology is available."""
857
        self.load_all_evcs()
858
859 1
    def load_all_evcs(self):
860
        """Try to load all EVCs on startup."""
861 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
862 1
        for circuit_id, circuit in circuits:
863 1
            if circuit_id not in self.circuits:
864 1
                self._load_evc(circuit)
865
866 1
    def _load_evc(self, circuit_dict):
867
        """Load one EVC from mongodb to memory."""
868 1
        try:
869 1
            evc = self._evc_from_dict(circuit_dict)
870 1
        except ValueError as exception:
871 1
            log.error(
872
                f"Could not load EVC: dict={circuit_dict} error={exception}"
873
            )
874 1
            return None
875
876 1
        if evc.archived:
877 1
            return None
878 1
        evc.deactivate()
879 1
        evc.sync()
880 1
        self.circuits.setdefault(evc.id, evc)
881 1
        self.sched.add(evc)
882 1
        return evc
883
884 1
    @listen_to("kytos/flow_manager.flow.error")
885 1
    def on_flow_mod_error(self, event):
886
        """Handle flow mod errors related to an EVC."""
887
        self.handle_flow_mod_error(event)
888
889 1
    def handle_flow_mod_error(self, event):
890
        """Handle flow mod errors related to an EVC."""
891 1
        flow = event.content["flow"]
892 1
        command = event.content.get("error_command")
893 1
        if command != "add":
894
            return
895 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
896 1
        if evc:
897 1
            with evc.lock:
898 1
                evc.remove_current_flows()
899
900 1
    def _evc_dict_with_instances(self, evc_dict):
901
        """Convert some dict values to instance of EVC classes.
902
903
        This method will convert: [UNI, Link]
904
        """
905 1
        data = evc_dict.copy()  # Do not modify the original dict
906 1
        for attribute, value in data.items():
907
            # Get multiple attributes.
908
            # Ex: uni_a, uni_z
909 1
            if "uni" in attribute:
910 1
                try:
911 1
                    data[attribute] = self._uni_from_dict(value)
912 1
                except ValueError as exception:
913 1
                    result = "Error creating UNI: Invalid value"
914 1
                    raise ValueError(result) from exception
915
916 1
            if attribute == "circuit_scheduler":
917 1
                data[attribute] = []
918 1
                for schedule in value:
919 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
920
921
            # Get multiple attributes.
922
            # Ex: primary_links,
923
            #     backup_links,
924
            #     current_links_cache,
925
            #     primary_links_cache,
926
            #     backup_links_cache
927 1
            if "links" in attribute:
928 1
                data[attribute] = [
929
                    self._link_from_dict(link) for link in value
930
                ]
931
932
            # Ex: current_path,
933
            #     primary_path,
934
            #     backup_path
935 1
            if "path" in attribute and attribute != "dynamic_backup_path":
936 1
                data[attribute] = Path(
937
                    [self._link_from_dict(link) for link in value]
938
                )
939
940 1
        return data
941
942 1
    def _evc_from_dict(self, evc_dict):
943 1
        data = self._evc_dict_with_instances(evc_dict)
944 1
        data["table_group"] = self.table_group
945 1
        return EVC(self.controller, **data)
946
947 1
    def _uni_from_dict(self, uni_dict):
948
        """Return a UNI object from python dict."""
949 1
        if uni_dict is None:
950 1
            return False
951
952 1
        interface_id = uni_dict.get("interface_id")
953 1
        interface = self.controller.get_interface_by_id(interface_id)
954 1
        if interface is None:
955 1
            result = (
956
                "Error creating UNI:"
957
                + f"Could not instantiate interface {interface_id}"
958
            )
959 1
            raise ValueError(result) from ValueError
960
961 1
        tag_dict = uni_dict.get("tag", None)
962 1
        if tag_dict:
963 1
            tag = TAG.from_dict(tag_dict)
964
        else:
965 1
            tag = None
966 1
        uni = UNI(interface, tag)
967
968 1
        return uni
969
970 1
    def _link_from_dict(self, link_dict):
971
        """Return a Link object from python dict."""
972 1
        id_a = link_dict.get("endpoint_a").get("id")
973 1
        id_b = link_dict.get("endpoint_b").get("id")
974
975 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
976 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
977 1
        if not endpoint_a:
978 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
979 1
            raise ValueError(error_msg)
980 1
        if not endpoint_b:
981
            error_msg = f"Could not get interface endpoint_b id {id_b}"
982
            raise ValueError(error_msg)
983
984 1
        link = Link(endpoint_a, endpoint_b)
985 1
        if "metadata" in link_dict:
986 1
            link.extend_metadata(link_dict.get("metadata"))
987
988 1
        s_vlan = link.get_metadata("s_vlan")
989 1
        if s_vlan:
990 1
            tag = TAG.from_dict(s_vlan)
991 1
            if tag is False:
992
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
993
                raise ValueError(error_msg)
994 1
            link.update_metadata("s_vlan", tag)
995 1
        return link
996
997 1
    def _find_evc_by_schedule_id(self, schedule_id):
998
        """
999
        Find an EVC and CircuitSchedule based on schedule_id.
1000
1001
        :param schedule_id: Schedule ID
1002
        :return: EVC and Schedule
1003
        """
1004 1
        circuits = self._get_circuits_buffer()
1005 1
        found_schedule = None
1006 1
        evc = None
1007
1008
        # pylint: disable=unused-variable
1009 1
        for c_id, circuit in circuits.items():
1010 1
            for schedule in circuit.circuit_scheduler:
1011 1
                if schedule.id == schedule_id:
1012 1
                    found_schedule = schedule
1013 1
                    evc = circuit
1014 1
                    break
1015 1
            if found_schedule:
1016 1
                break
1017 1
        return evc, found_schedule
1018
1019 1
    def _get_circuits_buffer(self):
1020
        """
1021
        Return the circuit buffer.
1022
1023
        If the buffer is empty, try to load data from mongodb.
1024
        """
1025 1
        if not self.circuits:
1026
            # Load circuits from mongodb to buffer
1027 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1028 1
            for c_id, circuit in circuits.items():
1029 1
                evc = self._evc_from_dict(circuit)
1030 1
                self.circuits[c_id] = evc
1031 1
        return self.circuits
1032
1033
    # pylint: disable=attribute-defined-outside-init
1034 1
    @alisten_to("kytos/of_multi_table.enable_table")
1035 1
    async def on_table_enabled(self, event):
1036
        """Handle a recently table enabled."""
1037 1
        table_group = event.content.get("mef_eline", None)
1038 1
        if not table_group:
1039
            return
1040 1
        for group in table_group:
1041 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1042 1
                log.error(f'The table group "{group}" is not allowed for '
1043
                          f'mef_eline. Allowed table groups are '
1044
                          f'{settings.TABLE_GROUP_ALLOWED}')
1045 1
                return
1046 1
        self.table_group.update(table_group)
1047 1
        content = {"group_table": self.table_group}
1048 1
        name = "kytos/mef_eline.enable_table"
1049
        await aemit_event(self.controller, name, content)
1050