Passed
Pull Request — master (#383)
by
unknown
04:22
created

build.main.Main.handle_topology_update()   B

Complexity

Conditions 8

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 8.0877

Importance

Changes 0
Metric Value
cc 8
eloc 12
nop 2
dl 0
loc 14
ccs 8
cts 9
cp 0.8889
crap 8.0877
rs 7.3333
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20
                                 get_json_or_400)
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
23 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
27
                                         emit_event, map_evc_event_content)
28
29
30
# pylint: disable=too-many-public-methods
31 1
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36
37 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38
39 1
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47
        # object used to scheduler circuit events
48 1
        self.sched = Scheduler()
49
50
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52 1
        self.mongo_controller.bootstrap_indexes()
53
54
        # set the controller that will manager the dynamic paths
55 1
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58
        # Every create/update/delete must be synced to mongodb.
59 1
        self.circuits = {}
60
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64
65 1
        self.load_all_evcs()
66 1
        self._topology_updated_at = None
67
68 1
    def get_evcs_by_svc_level(self) -> list:
69
        """Get circuits sorted by desc service level and asc creation_time.
70
71
        In the future, as more ops are offloaded it should be get from the DB.
72
        """
73 1
        return sorted(self.circuits.values(),
74
                      key=lambda x: (-x.service_level, x.creation_time))
75
76 1
    @staticmethod
77 1
    def get_eline_controller():
78
        """Return the ELineController instance."""
79
        return controllers.ELineController()
80
81 1
    def execute(self):
82
        """Execute once when the napp is running."""
83 1
        if self._lock.locked():
84 1
            return
85 1
        log.debug("Starting consistency routine")
86 1
        with self._lock:
87 1
            self.execute_consistency()
88 1
        log.debug("Finished consistency routine")
89
90 1
    def should_be_checked(self, circuit):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92
        # pylint: disable=too-many-boolean-expressions
93 1
        if (
94
                circuit.is_enabled()
95
                and not circuit.is_active()
96
                and not circuit.lock.locked()
97
                and not circuit.has_recent_removed_flow()
98
                and not circuit.is_recent_updated()
99
                and circuit.are_unis_active(self.controller.switches)
100
                # if a inter-switch EVC does not have current_path, it does not
101
                # make sense to run sdntrace on it
102
                and (circuit.is_intra_switch() or circuit.current_path)
103
                ):
104 1
            return True
105
        return False
106
107 1
    def execute_consistency(self):
108
        """Execute consistency routine."""
109 1
        circuits_to_check = []
110 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
111 1
        for circuit in self.get_evcs_by_svc_level():
112 1
            stored_circuits.pop(circuit.id, None)
113 1
            if self.should_be_checked(circuit):
114 1
                circuits_to_check.append(circuit)
115 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
116 1
        for circuit in circuits_to_check:
117 1
            is_checked = circuits_checked.get(circuit.id)
118 1
            if is_checked:
119 1
                circuit.execution_rounds = 0
120 1
                log.info(f"{circuit} enabled but inactive - activating")
121 1
                with circuit.lock:
122 1
                    circuit.activate()
123 1
                    circuit.sync()
124
            else:
125 1
                circuit.execution_rounds += 1
126 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
127 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
128 1
                    with circuit.lock:
129 1
                        circuit.deploy()
130 1
        for circuit_id in stored_circuits:
131 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
132 1
            self._load_evc(stored_circuits[circuit_id])
133
134 1
    def shutdown(self):
135
        """Execute when your napp is unloaded.
136
137
        If you have some cleanup procedure, insert it here.
138
        """
139
140 1
    @rest("/v2/evc/", methods=["GET"])
141 1
    def list_circuits(self, request: Request) -> JSONResponse:
142
        """Endpoint to return circuits stored.
143
144
        archive query arg if defined (not null) will be filtered
145
        accordingly, by default only non archived evcs will be listed
146
        """
147 1
        log.debug("list_circuits /v2/evc")
148 1
        args = request.query_params
149 1
        archived = args.get("archived", "false").lower()
150 1
        check_consistency = args.get("checkConsistency", "false").lower()
151 1
        args = {
152
            k: v for k, v in args.items()
153
            if k not in {"archived", "checkConsistency"}
154
        }
155 1
        circuits = self.mongo_controller.get_circuits(
156
            archived=archived,
157
            metadata=args
158
        )
159 1
        circuits = circuits['circuits']
160 1
        if check_consistency == 'true':
161
            check_circuits = [
162
                value for key, value in self.circuits.items()
163
                if key in circuits
164
            ]
165
            consistent_circuits = EVCDeploy.check_list_traces(check_circuits)
166
            for circuit_id, circuit in circuits.items():
167
                circuit['consistent'] = \
168
                    consistent_circuits.get(circuit_id, False)
169
170 1
        return JSONResponse(circuits)
171
172 1
    @rest("/v2/evc/schedule", methods=["GET"])
173 1
    def list_schedules(self, _request: Request) -> JSONResponse:
174
        """Endpoint to return all schedules stored for all circuits.
175
176
        Return a JSON with the following template:
177
        [{"schedule_id": <schedule_id>,
178
         "circuit_id": <circuit_id>,
179
         "schedule": <schedule object>}]
180
        """
181 1
        log.debug("list_schedules /v2/evc/schedule")
182 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
183 1
        if not circuits:
184 1
            result = {}
185 1
            status = 200
186 1
            return JSONResponse(result, status_code=status)
187
188 1
        result = []
189 1
        status = 200
190 1
        for circuit in circuits:
191 1
            circuit_scheduler = circuit.get("circuit_scheduler")
192 1
            if circuit_scheduler:
193 1
                for scheduler in circuit_scheduler:
194 1
                    value = {
195
                        "schedule_id": scheduler.get("id"),
196
                        "circuit_id": circuit.get("id"),
197
                        "schedule": scheduler,
198
                    }
199 1
                    result.append(value)
200
201 1
        log.debug("list_schedules result %s %s", result, status)
202 1
        return JSONResponse(result, status_code=status)
203
204 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
205 1
    def get_circuit(self, request: Request) -> JSONResponse:
206
        """Endpoint to return a circuit based on id."""
207 1
        circuit_id = request.path_params["circuit_id"]
208 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
209 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
210 1
        if not circuit:
211 1
            result = f"circuit_id {circuit_id} not found"
212 1
            log.debug("get_circuit result %s %s", result, 404)
213 1
            raise HTTPException(404, detail=result)
214 1
        status = 200
215 1
        log.debug("get_circuit result %s %s", circuit, status)
216 1
        return JSONResponse(circuit, status_code=status)
217
218 1
    @rest("/v2/evc/", methods=["POST"])
219 1
    @validate_openapi(spec)
220 1
    def create_circuit(self, request: Request) -> JSONResponse:
221
        """Try to create a new circuit.
222
223
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
224
        UNI_Z's requested C-VID are available from the interfaces' pools. This
225
        is checked when creating the UNI object.
226
227
        Then, E-Line NApp requests a primary and a backup path to the
228
        Pathfinder NApp using the attributes primary_links and backup_links
229
        submitted via REST
230
231
        # For each link composing paths in #3:
232
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
233
        #  - Using the S-VID obtained, generate abstract flow entries to be
234
        #    sent to FlowManager
235
236
        Push abstract flow entries to FlowManager and FlowManager pushes
237
        OpenFlow entries to datapaths
238
239
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
240
        creation
241
242
        Finnaly, notify user of the status of its request.
243
        """
244
        # Try to create the circuit object
245 1
        log.debug("create_circuit /v2/evc/")
246 1
        data = get_json_or_400(request, self.controller.loop)
247
248 1
        try:
249 1
            evc = self._evc_from_dict(data)
250 1
        except ValueError as exception:
251 1
            log.debug("create_circuit result %s %s", exception, 400)
252 1
            raise HTTPException(400, detail=str(exception)) from exception
253
254 1
        try:
255 1
            check_disabled_component(evc.uni_a, evc.uni_z)
256 1
        except DisabledSwitch as exception:
257 1
            log.debug("create_circuit result %s %s", exception, 409)
258 1
            raise HTTPException(
259
                    409,
260
                    detail=f"Path is not valid: {exception}"
261
                ) from exception
262
263 1
        if evc.primary_path:
264
            try:
265
                evc.primary_path.is_valid(
266
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272
                    400,
273
                    detail=f"primary_path is not valid: {exception}"
274
                ) from exception
275 1
        if evc.backup_path:
276
            try:
277
                evc.backup_path.is_valid(
278
                    evc.uni_a.interface.switch,
279
                    evc.uni_z.interface.switch,
280
                    bool(evc.circuit_scheduler),
281
                )
282
            except InvalidPath as exception:
283
                raise HTTPException(
284
                    400,
285
                    detail=f"backup_path is not valid: {exception}"
286
                ) from exception
287
288
        # verify duplicated evc
289 1
        if self._is_duplicated_evc(evc):
290 1
            result = "The EVC already exists."
291 1
            log.debug("create_circuit result %s %s", result, 409)
292 1
            raise HTTPException(409, detail=result)
293
294 1
        try:
295 1
            evc._validate_has_primary_or_dynamic()
296 1
        except ValueError as exception:
297 1
            raise HTTPException(400, detail=str(exception)) from exception
298
299 1
        try:
300 1
            self._use_uni_tags(evc)
301
        except ValueError as exception:
302
            raise HTTPException(400, detail=str(exception)) from exception
303
304
        # save circuit
305 1
        try:
306 1
            evc.sync()
307
        except ValidationError as exception:
308
            raise HTTPException(400, detail=str(exception)) from exception
309
310
        # store circuit in dictionary
311 1
        self.circuits[evc.id] = evc
312
313
        # Schedule the circuit deploy
314 1
        self.sched.add(evc)
315
316
        # Circuit has no schedule, deploy now
317 1
        if not evc.circuit_scheduler:
318 1
            with evc.lock:
319 1
                evc.deploy()
320
321
        # Notify users
322 1
        result = {"circuit_id": evc.id}
323 1
        status = 201
324 1
        log.debug("create_circuit result %s %s", result, status)
325 1
        emit_event(self.controller, name="created",
326
                   content=map_evc_event_content(evc))
327 1
        return JSONResponse(result, status_code=status)
328
329 1
    @staticmethod
330 1
    def _use_uni_tags(evc):
331 1
        uni_a = evc.uni_a
332 1
        try:
333 1
            evc._use_uni_vlan(uni_a)
334 1
        except ValueError as err:
335 1
            raise err
336 1
        try:
337 1
            uni_z = evc.uni_z
338 1
            evc._use_uni_vlan(uni_z)
339 1
        except ValueError as err:
340 1
            evc.make_uni_vlan_available(uni_a)
341 1
            raise err
342
343 1
    @listen_to('kytos/flow_manager.flow.removed')
344 1
    def on_flow_delete(self, event):
345
        """Capture delete messages to keep track when flows got removed."""
346
        self.handle_flow_delete(event)
347
348 1
    def handle_flow_delete(self, event):
349
        """Keep track when the EVC got flows removed by deriving its cookie."""
350 1
        flow = event.content["flow"]
351 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
352 1
        if evc:
353 1
            log.debug("Flow removed in EVC %s", evc.id)
354 1
            evc.set_flow_removed_at()
355
356 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
357 1
    @validate_openapi(spec)
358 1
    def update(self, request: Request) -> JSONResponse:
359
        """Update a circuit based on payload.
360
361
        The EVC attributes (creation_time, active, current_path,
362
        failover_path, _id, archived) can't be updated.
363
        """
364 1
        data = get_json_or_400(request, self.controller.loop)
365 1
        circuit_id = request.path_params["circuit_id"]
366 1
        log.debug("update /v2/evc/%s", circuit_id)
367 1
        try:
368 1
            evc = self.circuits[circuit_id]
369 1
        except KeyError:
370 1
            result = f"circuit_id {circuit_id} not found"
371 1
            log.debug("update result %s %s", result, 404)
372 1
            raise HTTPException(404, detail=result) from KeyError
373
374 1
        if evc.archived:
375 1
            result = "Can't update archived EVC"
376 1
            log.debug("update result %s %s", result, 409)
377 1
            raise HTTPException(409, detail=result)
378
379 1
        try:
380 1
            enable, redeploy = evc.update(
381
                **self._evc_dict_with_instances(data)
382
            )
383 1
        except ValidationError as exception:
384
            raise HTTPException(400, detail=str(exception)) from exception
385 1
        except ValueError as exception:
386 1
            log.error(exception)
387 1
            log.debug("update result %s %s", exception, 400)
388 1
            raise HTTPException(400, detail=str(exception)) from exception
389 1
        except DisabledSwitch as exception:
390 1
            log.debug("update result %s %s", exception, 409)
391 1
            raise HTTPException(
392
                    409,
393
                    detail=f"Path is not valid: {exception}"
394
                ) from exception
395
396 1
        if evc.is_active():
397
            if enable is False:  # disable if active
398
                with evc.lock:
399
                    evc.remove()
400
            elif redeploy is not None:  # redeploy if active
401
                with evc.lock:
402
                    evc.remove()
403
                    evc.deploy()
404
        else:
405 1
            if enable is True:  # enable if inactive
406 1
                with evc.lock:
407 1
                    evc.deploy()
408 1
        result = {evc.id: evc.as_dict()}
409 1
        status = 200
410
411 1
        log.debug("update result %s %s", result, status)
412 1
        emit_event(self.controller, "updated",
413
                   content=map_evc_event_content(evc, **data))
414 1
        return JSONResponse(result, status_code=status)
415
416 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
417 1
    def delete_circuit(self, request: Request) -> JSONResponse:
418
        """Remove a circuit.
419
420
        First, the flows are removed from the switches, and then the EVC is
421
        disabled.
422
        """
423 1
        circuit_id = request.path_params["circuit_id"]
424 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
425 1
        try:
426 1
            evc = self.circuits[circuit_id]
427 1
        except KeyError:
428 1
            result = f"circuit_id {circuit_id} not found"
429 1
            log.debug("delete_circuit result %s %s", result, 404)
430 1
            raise HTTPException(404, detail=result) from KeyError
431
432 1
        if evc.archived:
433 1
            result = f"Circuit {circuit_id} already removed"
434 1
            log.debug("delete_circuit result %s %s", result, 404)
435 1
            raise HTTPException(404, detail=result)
436
437 1
        log.info("Removing %s", evc)
438 1
        with evc.lock:
439 1
            evc.remove_current_flows()
440 1
            evc.remove_failover_flows(sync=False)
441 1
            evc.deactivate()
442 1
            evc.disable()
443 1
            self.sched.remove(evc)
444 1
            evc.archive()
445 1
            evc.remove_uni_tags()
446 1
            evc.sync()
447 1
        log.info("EVC removed. %s", evc)
448 1
        result = {"response": f"Circuit {circuit_id} removed"}
449 1
        status = 200
450
451 1
        log.debug("delete_circuit result %s %s", result, status)
452 1
        emit_event(self.controller, "deleted",
453
                   content=map_evc_event_content(evc))
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, "Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
519
520 1
        fail_evcs = []
521 1
        for _id in circuit_ids:
522 1
            try:
523 1
                evc = self.circuits[_id]
524 1
                evc.remove_metadata(key)
525
            except KeyError:
526
                fail_evcs.append(_id)
527
528 1
        if fail_evcs:
529
            raise HTTPException(404, detail=fail_evcs)
530 1
        return JSONResponse("Operation successful")
531
532 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
533 1
    def delete_metadata(self, request: Request) -> JSONResponse:
534
        """Delete metadata from an EVC."""
535 1
        circuit_id = request.path_params["circuit_id"]
536 1
        key = request.path_params["key"]
537 1
        try:
538 1
            evc = self.circuits[circuit_id]
539 1
        except KeyError as error:
540 1
            raise HTTPException(
541
                404,
542
                detail=f"circuit_id {circuit_id} not found."
543
            ) from error
544
545 1
        evc.remove_metadata(key)
546 1
        evc.sync()
547 1
        return JSONResponse("Operation successful")
548
549 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
550 1
    def redeploy(self, request: Request) -> JSONResponse:
551
        """Endpoint to force the redeployment of an EVC."""
552 1
        circuit_id = request.path_params["circuit_id"]
553 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
554 1
        try:
555 1
            evc = self.circuits[circuit_id]
556 1
        except KeyError:
557 1
            raise HTTPException(
558
                404,
559
                detail=f"circuit_id {circuit_id} not found"
560
            ) from KeyError
561 1
        if evc.is_enabled():
562 1
            with evc.lock:
563 1
                evc.remove_current_flows()
564 1
                evc.deploy()
565 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
566 1
            status = 202
567
        else:
568 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
569 1
            status = 409
570
571 1
        return JSONResponse(result, status_code=status)
572
573 1
    @rest("/v2/evc/schedule/", methods=["POST"])
574 1
    @validate_openapi(spec)
575 1
    def create_schedule(self, request: Request) -> JSONResponse:
576
        """
577
        Create a new schedule for a given circuit.
578
579
        This service do no check if there are conflicts with another schedule.
580
        Payload example:
581
            {
582
              "circuit_id":"aa:bb:cc",
583
              "schedule": {
584
                "date": "2019-08-07T14:52:10.967Z",
585
                "interval": "string",
586
                "frequency": "1 * * * *",
587
                "action": "create"
588
              }
589
            }
590
        """
591 1
        log.debug("create_schedule /v2/evc/schedule/")
592 1
        data = get_json_or_400(request, self.controller.loop)
593 1
        circuit_id = data["circuit_id"]
594 1
        schedule_data = data["schedule"]
595
596
        # Get EVC from circuits buffer
597 1
        circuits = self._get_circuits_buffer()
598
599
        # get the circuit
600 1
        evc = circuits.get(circuit_id)
601
602
        # get the circuit
603 1
        if not evc:
604 1
            result = f"circuit_id {circuit_id} not found"
605 1
            log.debug("create_schedule result %s %s", result, 404)
606 1
            raise HTTPException(404, detail=result)
607
        # Can not modify circuits deleted and archived
608 1
        if evc.archived:
609 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
610 1
            log.debug("create_schedule result %s %s", result, 409)
611 1
            raise HTTPException(409, detail=result)
612
613
        # new schedule from dict
614 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
615
616
        # If there is no schedule, create the list
617 1
        if not evc.circuit_scheduler:
618 1
            evc.circuit_scheduler = []
619
620
        # Add the new schedule
621 1
        evc.circuit_scheduler.append(new_schedule)
622
623
        # Add schedule job
624 1
        self.sched.add_circuit_job(evc, new_schedule)
625
626
        # save circuit to mongodb
627 1
        evc.sync()
628
629 1
        result = new_schedule.as_dict()
630 1
        status = 201
631
632 1
        log.debug("create_schedule result %s %s", result, status)
633 1
        return JSONResponse(result, status_code=status)
634
635 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
636 1
    @validate_openapi(spec)
637 1
    def update_schedule(self, request: Request) -> JSONResponse:
638
        """Update a schedule.
639
640
        Change all attributes from the given schedule from a EVC circuit.
641
        The schedule ID is preserved as default.
642
        Payload example:
643
            {
644
              "date": "2019-08-07T14:52:10.967Z",
645
              "interval": "string",
646
              "frequency": "1 * * *",
647
              "action": "create"
648
            }
649
        """
650 1
        data = get_json_or_400(request, self.controller.loop)
651 1
        schedule_id = request.path_params["schedule_id"]
652 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
653
654
        # Try to find a circuit schedule
655 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
656
657
        # Can not modify circuits deleted and archived
658 1
        if not found_schedule:
659 1
            result = f"schedule_id {schedule_id} not found"
660 1
            log.debug("update_schedule result %s %s", result, 404)
661 1
            raise HTTPException(404, detail=result)
662 1
        if evc.archived:
663 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
664 1
            log.debug("update_schedule result %s %s", result, 409)
665 1
            raise HTTPException(409, detail=result)
666
667 1
        new_schedule = CircuitSchedule.from_dict(data)
668 1
        new_schedule.id = found_schedule.id
669
        # Remove the old schedule
670 1
        evc.circuit_scheduler.remove(found_schedule)
671
        # Append the modified schedule
672 1
        evc.circuit_scheduler.append(new_schedule)
673
674
        # Cancel all schedule jobs
675 1
        self.sched.cancel_job(found_schedule.id)
676
        # Add the new circuit schedule
677 1
        self.sched.add_circuit_job(evc, new_schedule)
678
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = new_schedule.as_dict()
682 1
        status = 200
683
684 1
        log.debug("update_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
688 1
    def delete_schedule(self, request: Request) -> JSONResponse:
689
        """Remove a circuit schedule.
690
691
        Remove the Schedule from EVC.
692
        Remove the Schedule from cron job.
693
        Save the EVC to the Storehouse.
694
        """
695 1
        schedule_id = request.path_params["schedule_id"]
696 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
697 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
698
699
        # Can not modify circuits deleted and archived
700 1
        if not found_schedule:
701 1
            result = f"schedule_id {schedule_id} not found"
702 1
            log.debug("delete_schedule result %s %s", result, 404)
703 1
            raise HTTPException(404, detail=result)
704
705 1
        if evc.archived:
706 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
707 1
            log.debug("delete_schedule result %s %s", result, 409)
708 1
            raise HTTPException(409, detail=result)
709
710
        # Remove the old schedule
711 1
        evc.circuit_scheduler.remove(found_schedule)
712
713
        # Cancel all schedule jobs
714 1
        self.sched.cancel_job(found_schedule.id)
715
        # Save EVC to mongodb
716 1
        evc.sync()
717
718 1
        result = "Schedule removed"
719 1
        status = 200
720
721 1
        log.debug("delete_schedule result %s %s", result, status)
722 1
        return JSONResponse(result, status_code=status)
723
724 1
    def _is_duplicated_evc(self, evc):
725
        """Verify if the circuit given is duplicated with the stored evcs.
726
727
        Args:
728
            evc (EVC): circuit to be analysed.
729
730
        Returns:
731
            boolean: True if the circuit is duplicated, otherwise False.
732
733
        """
734 1
        for circuit in tuple(self.circuits.values()):
735 1
            if not circuit.archived and circuit.shares_uni(evc):
736 1
                return True
737 1
        return False
738
739 1
    @listen_to("kytos/topology.link_up")
740 1
    def on_link_up(self, event):
741
        """Change circuit when link is up or end_maintenance."""
742
        self.handle_link_up(event)
743
744 1
    def handle_link_up(self, event):
745
        """Change circuit when link is up or end_maintenance."""
746 1
        log.info("Event handle_link_up %s", event.content["link"])
747 1
        for evc in self.get_evcs_by_svc_level():
748 1
            if evc.is_enabled() and not evc.archived:
749 1
                with evc.lock:
750 1
                    evc.handle_link_up(event.content["link"])
751
752
    # Possibly replace this with interruptions?
753 1
    @listen_to(
754
        '.*.switch.interface.(link_up|link_down|created|deleted)',
755
        pool='dynamic_single'
756
    )
757 1
    def on_interface_link_change(self, event: KytosEvent):
758
        """
759
        Handler for interface link_up and link_down events
760
        """
761
        with self._lock:
762
            _, _, event_type = event.name.rpartition('.')
763
            iface = event.content.get("interface")
764
            if event_type in ('link_up', 'created'):
765
                self.handle_interface_link_up(iface)
766
            elif event_type in ('link_down', 'deleted'):
767
                self.handle_interface_link_down(iface)
768
769 1
    def handle_interface_link_up(self, interface):
770
        """
771
        Handler for interface link_up events
772
        """
773
        for evc in self.get_evcs_by_svc_level():
774
            log.info("Event handle_interface_link_up %s", interface)
775
            evc.handle_interface_link_up(
776
                interface
777
            )
778
779 1
    def handle_interface_link_down(self, interface):
780
        """
781
        Handler for interface link_down events
782
        """
783
        for evc in self.get_evcs_by_svc_level():
784
            log.info("Event handle_interface_link_down %s", interface)
785
            evc.handle_interface_link_down(
786
                interface
787
            )
788
789 1
    @listen_to("kytos/topology.link_down")
790 1
    def on_link_down(self, event):
791
        """Change circuit when link is down or under_mantenance."""
792
        self.handle_link_down(event)
793
794 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
795
        """Change circuit when link is down or under_mantenance."""
796 1
        link = event.content["link"]
797 1
        log.info("Event handle_link_down %s", link)
798 1
        switch_flows = {}
799 1
        evcs_with_failover = []
800 1
        evcs_normal = []
801 1
        check_failover = []
802 1
        for evc in self.get_evcs_by_svc_level():
803 1
            if evc.is_affected_by_link(link):
804
                # if there is no failover path, handles link down the
805
                # tradditional way
806 1
                if (
807
                    not getattr(evc, 'failover_path', None) or
808
                    evc.is_failover_path_affected_by_link(link)
809
                ):
810 1
                    evcs_normal.append(evc)
811 1
                    continue
812 1
                try:
813 1
                    dpid_flows = evc.get_failover_flows()
814
                # pylint: disable=broad-except
815 1
                except Exception:
816 1
                    err = traceback.format_exc().replace("\n", ", ")
817 1
                    log.error(
818
                        f"Ignore Failover path for {evc} due to error: {err}"
819
                    )
820 1
                    evcs_normal.append(evc)
821 1
                    continue
822 1
                for dpid, flows in dpid_flows.items():
823 1
                    switch_flows.setdefault(dpid, [])
824 1
                    switch_flows[dpid].extend(flows)
825 1
                evcs_with_failover.append(evc)
826
            else:
827 1
                check_failover.append(evc)
828
829 1
        while switch_flows:
830 1
            offset = settings.BATCH_SIZE or None
831 1
            switches = list(switch_flows.keys())
832 1
            for dpid in switches:
833 1
                emit_event(
834
                    self.controller,
835
                    context="kytos.flow_manager",
836
                    name="flows.install",
837
                    content={
838
                        "dpid": dpid,
839
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
840
                    }
841
                )
842 1
                if offset is None or offset >= len(switch_flows[dpid]):
843 1
                    del switch_flows[dpid]
844 1
                    continue
845 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
846 1
            time.sleep(settings.BATCH_INTERVAL)
847
848 1
        for evc in evcs_with_failover:
849 1
            with evc.lock:
850 1
                old_path = evc.current_path
851 1
                evc.current_path = evc.failover_path
852 1
                evc.failover_path = old_path
853 1
                evc.sync()
854 1
            emit_event(self.controller, "redeployed_link_down",
855
                       content=map_evc_event_content(evc))
856 1
            log.info(
857
                f"{evc} redeployed with failover due to link down {link.id}"
858
            )
859
860 1
        for evc in evcs_normal:
861 1
            emit_event(
862
                self.controller,
863
                "evc_affected_by_link_down",
864
                content={"link_id": link.id} | map_evc_event_content(evc)
865
            )
866
867
        # After handling the hot path, check if new failover paths are needed.
868
        # Note that EVCs affected by link down will generate a KytosEvent for
869
        # deployed|redeployed, which will trigger the failover path setup.
870
        # Thus, we just need to further check the check_failover list
871 1
        for evc in check_failover:
872 1
            if evc.is_failover_path_affected_by_link(link):
873 1
                with evc.lock:
874 1
                    evc.setup_failover_path()
875
876 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
877 1
    def on_evc_affected_by_link_down(self, event):
878
        """Change circuit when link is down or under_mantenance."""
879
        self.handle_evc_affected_by_link_down(event)
880
881 1
    def handle_evc_affected_by_link_down(self, event):
882
        """Change circuit when link is down or under_mantenance."""
883 1
        evc = self.circuits.get(event.content["evc_id"])
884 1
        link_id = event.content['link_id']
885 1
        if not evc:
886 1
            return
887 1
        with evc.lock:
888 1
            result = evc.handle_link_down()
889 1
        event_name = "error_redeploy_link_down"
890 1
        if result:
891 1
            log.info(f"{evc} redeployed due to link down {link_id}")
892 1
            event_name = "redeployed_link_down"
893 1
        emit_event(self.controller, event_name,
894
                   content=map_evc_event_content(evc))
895
896 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
897 1
    def on_evc_deployed(self, event):
898
        """Handle EVC deployed|redeployed_link_down."""
899
        self.handle_evc_deployed(event)
900
901 1
    def handle_evc_deployed(self, event):
902
        """Setup failover path on evc deployed."""
903 1
        evc = self.circuits.get(event.content["evc_id"])
904 1
        if not evc:
905 1
            return
906 1
        with evc.lock:
907 1
            evc.setup_failover_path()
908
909 1
    @listen_to("kytos/topology.topology_loaded")
910 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
911
        """Load EVCs once the topology is available."""
912
        self.load_all_evcs()
913
914 1
    def load_all_evcs(self):
915
        """Try to load all EVCs on startup."""
916 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
917 1
        for circuit_id, circuit in circuits:
918 1
            if circuit_id not in self.circuits:
919 1
                self._load_evc(circuit)
920
921 1
    def _load_evc(self, circuit_dict):
922
        """Load one EVC from mongodb to memory."""
923 1
        try:
924 1
            evc = self._evc_from_dict(circuit_dict)
925 1
        except ValueError as exception:
926 1
            log.error(
927
                f"Could not load EVC: dict={circuit_dict} error={exception}"
928
            )
929 1
            return None
930
931 1
        if evc.archived:
932 1
            return None
933
934 1
        self.circuits.setdefault(evc.id, evc)
935 1
        self.sched.add(evc)
936 1
        return evc
937
938 1
    @listen_to("kytos/flow_manager.flow.error")
939 1
    def on_flow_mod_error(self, event):
940
        """Handle flow mod errors related to an EVC."""
941
        self.handle_flow_mod_error(event)
942
943 1
    def handle_flow_mod_error(self, event):
944
        """Handle flow mod errors related to an EVC."""
945 1
        flow = event.content["flow"]
946 1
        command = event.content.get("error_command")
947 1
        if command != "add":
948
            return
949 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
950 1
        if evc:
951 1
            with evc.lock:
952 1
                evc.remove_current_flows()
953
954 1
    def _evc_dict_with_instances(self, evc_dict):
955
        """Convert some dict values to instance of EVC classes.
956
957
        This method will convert: [UNI, Link]
958
        """
959 1
        data = evc_dict.copy()  # Do not modify the original dict
960 1
        for attribute, value in data.items():
961
            # Get multiple attributes.
962
            # Ex: uni_a, uni_z
963 1
            if "uni" in attribute:
964 1
                try:
965 1
                    data[attribute] = self._uni_from_dict(value)
966 1
                except ValueError as exception:
967 1
                    result = "Error creating UNI: Invalid value"
968 1
                    raise ValueError(result) from exception
969
970 1
            if attribute == "circuit_scheduler":
971 1
                data[attribute] = []
972 1
                for schedule in value:
973 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
974
975
            # Get multiple attributes.
976
            # Ex: primary_links,
977
            #     backup_links,
978
            #     current_links_cache,
979
            #     primary_links_cache,
980
            #     backup_links_cache
981 1
            if "links" in attribute:
982 1
                data[attribute] = [
983
                    self._link_from_dict(link) for link in value
984
                ]
985
986
            # Ex: current_path,
987
            #     primary_path,
988
            #     backup_path
989 1
            if "path" in attribute and attribute != "dynamic_backup_path":
990 1
                data[attribute] = Path(
991
                    [self._link_from_dict(link) for link in value]
992
                )
993
994 1
        return data
995
996 1
    def _evc_from_dict(self, evc_dict):
997 1
        data = self._evc_dict_with_instances(evc_dict)
998 1
        data["table_group"] = self.table_group
999 1
        return EVC(self.controller, **data)
1000
1001 1
    def _uni_from_dict(self, uni_dict):
1002
        """Return a UNI object from python dict."""
1003 1
        if uni_dict is None:
1004 1
            return False
1005
1006 1
        interface_id = uni_dict.get("interface_id")
1007 1
        interface = self.controller.get_interface_by_id(interface_id)
1008 1
        if interface is None:
1009 1
            result = (
1010
                "Error creating UNI:"
1011
                + f"Could not instantiate interface {interface_id}"
1012
            )
1013 1
            raise ValueError(result) from ValueError
1014 1
        tag_convert = {1: "vlan"}
1015 1
        tag_dict = uni_dict.get("tag", None)
1016 1
        if tag_dict:
1017 1
            tag_type = tag_dict.get("tag_type")
1018 1
            tag_type = tag_convert.get(tag_type, tag_type)
1019 1
            tag_value = tag_dict.get("value")
1020 1
            tag = TAG(tag_type, tag_value)
1021
        else:
1022 1
            tag = None
1023 1
        uni = UNI(interface, tag)
1024
1025 1
        return uni
1026
1027 1
    def _link_from_dict(self, link_dict):
1028
        """Return a Link object from python dict."""
1029 1
        id_a = link_dict.get("endpoint_a").get("id")
1030 1
        id_b = link_dict.get("endpoint_b").get("id")
1031
1032 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1033 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1034 1
        if not endpoint_a:
1035 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1036 1
            raise ValueError(error_msg)
1037 1
        if not endpoint_b:
1038
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1039
            raise ValueError(error_msg)
1040
1041 1
        link = Link(endpoint_a, endpoint_b)
1042 1
        if "metadata" in link_dict:
1043 1
            link.extend_metadata(link_dict.get("metadata"))
1044
1045 1
        s_vlan = link.get_metadata("s_vlan")
1046 1
        if s_vlan:
1047 1
            tag = TAG.from_dict(s_vlan)
1048 1
            if tag is False:
1049
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1050
                raise ValueError(error_msg)
1051 1
            link.update_metadata("s_vlan", tag)
1052 1
        return link
1053
1054 1
    def _find_evc_by_schedule_id(self, schedule_id):
1055
        """
1056
        Find an EVC and CircuitSchedule based on schedule_id.
1057
1058
        :param schedule_id: Schedule ID
1059
        :return: EVC and Schedule
1060
        """
1061 1
        circuits = self._get_circuits_buffer()
1062 1
        found_schedule = None
1063 1
        evc = None
1064
1065
        # pylint: disable=unused-variable
1066 1
        for c_id, circuit in circuits.items():
1067 1
            for schedule in circuit.circuit_scheduler:
1068 1
                if schedule.id == schedule_id:
1069 1
                    found_schedule = schedule
1070 1
                    evc = circuit
1071 1
                    break
1072 1
            if found_schedule:
1073 1
                break
1074 1
        return evc, found_schedule
1075
1076 1
    def _get_circuits_buffer(self):
1077
        """
1078
        Return the circuit buffer.
1079
1080
        If the buffer is empty, try to load data from mongodb.
1081
        """
1082 1
        if not self.circuits:
1083
            # Load circuits from mongodb to buffer
1084 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1085 1
            for c_id, circuit in circuits.items():
1086 1
                evc = self._evc_from_dict(circuit)
1087 1
                self.circuits[c_id] = evc
1088 1
        return self.circuits
1089
1090
    # pylint: disable=attribute-defined-outside-init
1091 1
    @alisten_to("kytos/of_multi_table.enable_table")
1092 1
    async def on_table_enabled(self, event):
1093
        """Handle a recently table enabled."""
1094 1
        table_group = event.content.get("mef_eline", None)
1095 1
        if not table_group:
1096
            return
1097 1
        for group in table_group:
1098 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1099 1
                log.error(f'The table group "{group}" is not allowed for '
1100
                          f'mef_eline. Allowed table groups are '
1101
                          f'{settings.TABLE_GROUP_ALLOWED}')
1102 1
                return
1103 1
        self.table_group.update(table_group)
1104 1
        content = {"group_table": self.table_group}
1105 1
        name = "kytos/mef_eline.enable_table"
1106
        await aemit_event(self.controller, name, content)
1107