Passed
Pull Request — master (#371)
by
unknown
03:30
created

build.main.Main.list_circuits()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 15
ccs 9
cts 9
cp 1
crap 1
rs 9.9
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
25
                                         emit_event, map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64 1
        self._topology_updated_at = None
65
66 1
    def get_evcs_by_svc_level(self) -> list:
67
        """Get circuits sorted by desc service level and asc creation_time.
68
69
        In the future, as more ops are offloaded it should be get from the DB.
70
        """
71 1
        return sorted(self.circuits.values(),
72
                      key=lambda x: (-x.service_level, x.creation_time))
73
74 1
    @staticmethod
75 1
    def get_eline_controller():
76
        """Return the ELineController instance."""
77
        return controllers.ELineController()
78
79 1
    def execute(self):
80
        """Execute once when the napp is running."""
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def should_be_checked(self, circuit):
89
        "Verify if the circuit meets the necessary conditions to be checked"
90
        # pylint: disable=too-many-boolean-expressions
91 1
        if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
                and circuit.are_unis_active(self.controller.switches)
98
                # if a inter-switch EVC does not have current_path, it does not
99
                # make sense to run sdntrace on it
100
                and (circuit.is_intra_switch() or circuit.current_path)
101
                ):
102 1
            return True
103
        return False
104
105 1
    def execute_consistency(self):
106
        """Execute consistency routine."""
107 1
        circuits_to_check = []
108 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
109 1
        for circuit in self.get_evcs_by_svc_level():
110 1
            stored_circuits.pop(circuit.id, None)
111 1
            if self.should_be_checked(circuit):
112 1
                circuits_to_check.append(circuit)
113 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
114 1
        for circuit in circuits_to_check:
115 1
            is_checked = circuits_checked.get(circuit.id)
116 1
            if is_checked:
117 1
                circuit.execution_rounds = 0
118 1
                log.info(f"{circuit} enabled but inactive - activating")
119 1
                with circuit.lock:
120 1
                    circuit.activate()
121 1
                    circuit.sync()
122
            else:
123 1
                circuit.execution_rounds += 1
124 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
125 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
126 1
                    with circuit.lock:
127 1
                        circuit.deploy()
128 1
        for circuit_id in stored_circuits:
129 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
130 1
            self._load_evc(stored_circuits[circuit_id])
131
132 1
    def shutdown(self):
133
        """Execute when your napp is unloaded.
134
135
        If you have some cleanup procedure, insert it here.
136
        """
137
138 1
    @rest("/v2/evc/", methods=["GET"])
139 1
    def list_circuits(self, request: Request) -> JSONResponse:
140
        """Endpoint to return circuits stored.
141
142
        archive query arg if defined (not null) will be filtered
143
        accordingly, by default only non archived evcs will be listed
144
        """
145 1
        log.debug("list_circuits /v2/evc")
146 1
        args = request.query_params
147 1
        archived = args.get("archived", "false").lower()
148 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
149 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
150
                                                      metadata=args)
151 1
        circuits = circuits['circuits']
152 1
        return JSONResponse(circuits)
153
154 1
    @rest("/v2/evc/schedule", methods=["GET"])
155 1
    def list_schedules(self, _request: Request) -> JSONResponse:
156
        """Endpoint to return all schedules stored for all circuits.
157
158
        Return a JSON with the following template:
159
        [{"schedule_id": <schedule_id>,
160
         "circuit_id": <circuit_id>,
161
         "schedule": <schedule object>}]
162
        """
163 1
        log.debug("list_schedules /v2/evc/schedule")
164 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
165 1
        if not circuits:
166 1
            result = {}
167 1
            status = 200
168 1
            return JSONResponse(result, status_code=status)
169
170 1
        result = []
171 1
        status = 200
172 1
        for circuit in circuits:
173 1
            circuit_scheduler = circuit.get("circuit_scheduler")
174 1
            if circuit_scheduler:
175 1
                for scheduler in circuit_scheduler:
176 1
                    value = {
177
                        "schedule_id": scheduler.get("id"),
178
                        "circuit_id": circuit.get("id"),
179
                        "schedule": scheduler,
180
                    }
181 1
                    result.append(value)
182
183 1
        log.debug("list_schedules result %s %s", result, status)
184 1
        return JSONResponse(result, status_code=status)
185
186 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
187 1
    def get_circuit(self, request: Request) -> JSONResponse:
188
        """Endpoint to return a circuit based on id."""
189 1
        circuit_id = request.path_params["circuit_id"]
190 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
191 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
192 1
        if not circuit:
193 1
            result = f"circuit_id {circuit_id} not found"
194 1
            log.debug("get_circuit result %s %s", result, 404)
195 1
            raise HTTPException(404, detail=result)
196 1
        status = 200
197 1
        log.debug("get_circuit result %s %s", circuit, status)
198 1
        return JSONResponse(circuit, status_code=status)
199
200 1
    @rest("/v2/evc/", methods=["POST"])
201 1
    @validate_openapi(spec)
202 1
    def create_circuit(self, request: Request) -> JSONResponse:
203
        """Try to create a new circuit.
204
205
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
206
        UNI_Z's requested C-VID are available from the interfaces' pools. This
207
        is checked when creating the UNI object.
208
209
        Then, E-Line NApp requests a primary and a backup path to the
210
        Pathfinder NApp using the attributes primary_links and backup_links
211
        submitted via REST
212
213
        # For each link composing paths in #3:
214
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
215
        #  - Using the S-VID obtained, generate abstract flow entries to be
216
        #    sent to FlowManager
217
218
        Push abstract flow entries to FlowManager and FlowManager pushes
219
        OpenFlow entries to datapaths
220
221
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
222
        creation
223
224
        Finnaly, notify user of the status of its request.
225
        """
226
        # Try to create the circuit object
227 1
        log.debug("create_circuit /v2/evc/")
228 1
        data = get_json_or_400(request, self.controller.loop)
229
230 1
        try:
231 1
            evc = self._evc_from_dict(data)
232 1
        except ValueError as exception:
233 1
            log.debug("create_circuit result %s %s", exception, 400)
234 1
            raise HTTPException(400, detail=str(exception)) from exception
235
236 1
        try:
237 1
            check_disabled_component(evc.uni_a, evc.uni_z)
238 1
        except DisabledSwitch as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 409)
240 1
            raise HTTPException(
241
                    409,
242
                    detail=f"Path is not valid: {exception}"
243
                ) from exception
244
245 1
        if evc.primary_path:
246
            try:
247
                evc.primary_path.is_valid(
248
                    evc.uni_a.interface.switch,
249
                    evc.uni_z.interface.switch,
250
                    bool(evc.circuit_scheduler),
251
                )
252
            except InvalidPath as exception:
253
                raise HTTPException(
254
                    400,
255
                    detail=f"primary_path is not valid: {exception}"
256
                ) from exception
257 1
        if evc.backup_path:
258
            try:
259
                evc.backup_path.is_valid(
260
                    evc.uni_a.interface.switch,
261
                    evc.uni_z.interface.switch,
262
                    bool(evc.circuit_scheduler),
263
                )
264
            except InvalidPath as exception:
265
                raise HTTPException(
266
                    400,
267
                    detail=f"backup_path is not valid: {exception}"
268
                ) from exception
269
270
        # verify duplicated evc
271 1
        if self._is_duplicated_evc(evc):
272 1
            result = "The EVC already exists."
273 1
            log.debug("create_circuit result %s %s", result, 409)
274 1
            raise HTTPException(409, detail=result)
275
276 1
        try:
277 1
            evc._validate_has_primary_or_dynamic()
278 1
        except ValueError as exception:
279 1
            raise HTTPException(400, detail=str(exception)) from exception
280
281 1
        try:
282 1
            self._use_uni_tags(evc)
283
        except ValueError as exception:
284
            raise HTTPException(400, detail=str(exception)) from exception
285
286
        # save circuit
287 1
        try:
288 1
            evc.sync()
289
        except ValidationError as exception:
290
            raise HTTPException(400, detail=str(exception)) from exception
291
292
        # store circuit in dictionary
293 1
        self.circuits[evc.id] = evc
294
295
        # Schedule the circuit deploy
296 1
        self.sched.add(evc)
297
298
        # Circuit has no schedule, deploy now
299 1
        if not evc.circuit_scheduler:
300 1
            with evc.lock:
301 1
                evc.deploy()
302
303
        # Notify users
304 1
        result = {"circuit_id": evc.id}
305 1
        status = 201
306 1
        log.debug("create_circuit result %s %s", result, status)
307 1
        emit_event(self.controller, name="created",
308
                   content=map_evc_event_content(evc))
309 1
        return JSONResponse(result, status_code=status)
310
311 1
    @staticmethod
312 1
    def _use_uni_tags(evc):
313 1
        uni_a = evc.uni_a
314 1
        try:
315 1
            evc._use_uni_vlan(uni_a)
316 1
        except ValueError as err:
317 1
            raise err
318 1
        try:
319 1
            uni_z = evc.uni_z
320 1
            evc._use_uni_vlan(uni_z)
321 1
        except ValueError as err:
322 1
            evc.make_uni_vlan_available(uni_a)
323 1
            raise err
324
325 1
    @listen_to('kytos/flow_manager.flow.removed')
326 1
    def on_flow_delete(self, event):
327
        """Capture delete messages to keep track when flows got removed."""
328
        self.handle_flow_delete(event)
329
330 1
    def handle_flow_delete(self, event):
331
        """Keep track when the EVC got flows removed by deriving its cookie."""
332 1
        flow = event.content["flow"]
333 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
334 1
        if evc:
335 1
            log.debug("Flow removed in EVC %s", evc.id)
336 1
            evc.set_flow_removed_at()
337
338 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
339 1
    @validate_openapi(spec)
340 1
    def update(self, request: Request) -> JSONResponse:
341
        """Update a circuit based on payload.
342
343
        The EVC attributes (creation_time, active, current_path,
344
        failover_path, _id, archived) can't be updated.
345
        """
346 1
        data = get_json_or_400(request, self.controller.loop)
347 1
        circuit_id = request.path_params["circuit_id"]
348 1
        log.debug("update /v2/evc/%s", circuit_id)
349 1
        try:
350 1
            evc = self.circuits[circuit_id]
351 1
        except KeyError:
352 1
            result = f"circuit_id {circuit_id} not found"
353 1
            log.debug("update result %s %s", result, 404)
354 1
            raise HTTPException(404, detail=result) from KeyError
355
356 1
        if evc.archived:
357 1
            result = "Can't update archived EVC"
358 1
            log.debug("update result %s %s", result, 409)
359 1
            raise HTTPException(409, detail=result)
360
361 1
        try:
362 1
            enable, redeploy = evc.update(
363
                **self._evc_dict_with_instances(data)
364
            )
365 1
        except ValidationError as exception:
366
            raise HTTPException(400, detail=str(exception)) from exception
367 1
        except ValueError as exception:
368 1
            log.error(exception)
369 1
            log.debug("update result %s %s", exception, 400)
370 1
            raise HTTPException(400, detail=str(exception)) from exception
371 1
        except DisabledSwitch as exception:
372 1
            log.debug("update result %s %s", exception, 409)
373 1
            raise HTTPException(
374
                    409,
375
                    detail=f"Path is not valid: {exception}"
376
                ) from exception
377
378 1
        if evc.is_active():
379
            if enable is False:  # disable if active
380
                with evc.lock:
381
                    evc.remove()
382
            elif redeploy is not None:  # redeploy if active
383
                with evc.lock:
384
                    evc.remove()
385
                    evc.deploy()
386
        else:
387 1
            if enable is True:  # enable if inactive
388 1
                with evc.lock:
389 1
                    evc.deploy()
390 1
        result = {evc.id: evc.as_dict()}
391 1
        status = 200
392
393 1
        log.debug("update result %s %s", result, status)
394 1
        emit_event(self.controller, "updated",
395
                   content=map_evc_event_content(evc, **data))
396 1
        return JSONResponse(result, status_code=status)
397
398 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
399 1
    def delete_circuit(self, request: Request) -> JSONResponse:
400
        """Remove a circuit.
401
402
        First, the flows are removed from the switches, and then the EVC is
403
        disabled.
404
        """
405 1
        circuit_id = request.path_params["circuit_id"]
406 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
407 1
        try:
408 1
            evc = self.circuits[circuit_id]
409 1
        except KeyError:
410 1
            result = f"circuit_id {circuit_id} not found"
411 1
            log.debug("delete_circuit result %s %s", result, 404)
412 1
            raise HTTPException(404, detail=result) from KeyError
413
414 1
        if evc.archived:
415 1
            result = f"Circuit {circuit_id} already removed"
416 1
            log.debug("delete_circuit result %s %s", result, 404)
417 1
            raise HTTPException(404, detail=result)
418
419 1
        log.info("Removing %s", evc)
420 1
        with evc.lock:
421 1
            evc.remove_current_flows()
422 1
            evc.remove_failover_flows(sync=False)
423 1
            evc.deactivate()
424 1
            evc.disable()
425 1
            self.sched.remove(evc)
426 1
            evc.archive()
427 1
            evc.remove_uni_tags()
428 1
            evc.sync()
429 1
        log.info("EVC removed. %s", evc)
430 1
        result = {"response": f"Circuit {circuit_id} removed"}
431 1
        status = 200
432
433 1
        log.debug("delete_circuit result %s %s", result, status)
434 1
        emit_event(self.controller, "deleted",
435
                   content=map_evc_event_content(evc))
436 1
        return JSONResponse(result, status_code=status)
437
438 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
439 1
    def get_metadata(self, request: Request) -> JSONResponse:
440
        """Get metadata from an EVC."""
441 1
        circuit_id = request.path_params["circuit_id"]
442 1
        try:
443 1
            return (
444
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
445
            )
446
        except KeyError as error:
447
            raise HTTPException(
448
                404,
449
                detail=f"circuit_id {circuit_id} not found."
450
            ) from error
451
452 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
453 1
    @validate_openapi(spec)
454 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
455
        """Add metadata to a bulk of EVCs."""
456 1
        data = get_json_or_400(request, self.controller.loop)
457 1
        circuit_ids = data.pop("circuit_ids")
458
459 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
460
461 1
        fail_evcs = []
462 1
        for _id in circuit_ids:
463 1
            try:
464 1
                evc = self.circuits[_id]
465 1
                evc.extend_metadata(data)
466 1
            except KeyError:
467 1
                fail_evcs.append(_id)
468
469 1
        if fail_evcs:
470 1
            raise HTTPException(404, detail=fail_evcs)
471 1
        return JSONResponse("Operation successful", status_code=201)
472
473 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
474 1
    @validate_openapi(spec)
475 1
    def add_metadata(self, request: Request) -> JSONResponse:
476
        """Add metadata to an EVC."""
477 1
        circuit_id = request.path_params["circuit_id"]
478 1
        metadata = get_json_or_400(request, self.controller.loop)
479 1
        if not isinstance(metadata, dict):
480
            raise HTTPException(400, "Invalid metadata value: {metadata}")
481 1
        try:
482 1
            evc = self.circuits[circuit_id]
483 1
        except KeyError as error:
484 1
            raise HTTPException(
485
                404,
486
                detail=f"circuit_id {circuit_id} not found."
487
            ) from error
488
489 1
        evc.extend_metadata(metadata)
490 1
        evc.sync()
491 1
        return JSONResponse("Operation successful", status_code=201)
492
493 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
494 1
    @validate_openapi(spec)
495 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
496
        """Delete metada from a bulk of EVCs"""
497 1
        data = get_json_or_400(request, self.controller.loop)
498 1
        key = request.path_params["key"]
499 1
        circuit_ids = data.pop("circuit_ids")
500 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
501
502 1
        fail_evcs = []
503 1
        for _id in circuit_ids:
504 1
            try:
505 1
                evc = self.circuits[_id]
506 1
                evc.remove_metadata(key)
507
            except KeyError:
508
                fail_evcs.append(_id)
509
510 1
        if fail_evcs:
511
            raise HTTPException(404, detail=fail_evcs)
512 1
        return JSONResponse("Operation successful")
513
514 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
515 1
    def delete_metadata(self, request: Request) -> JSONResponse:
516
        """Delete metadata from an EVC."""
517 1
        circuit_id = request.path_params["circuit_id"]
518 1
        key = request.path_params["key"]
519 1
        try:
520 1
            evc = self.circuits[circuit_id]
521 1
        except KeyError as error:
522 1
            raise HTTPException(
523
                404,
524
                detail=f"circuit_id {circuit_id} not found."
525
            ) from error
526
527 1
        evc.remove_metadata(key)
528 1
        evc.sync()
529 1
        return JSONResponse("Operation successful")
530
531 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
532 1
    def redeploy(self, request: Request) -> JSONResponse:
533
        """Endpoint to force the redeployment of an EVC."""
534 1
        circuit_id = request.path_params["circuit_id"]
535 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
536 1
        try:
537 1
            evc = self.circuits[circuit_id]
538 1
        except KeyError:
539 1
            raise HTTPException(
540
                404,
541
                detail=f"circuit_id {circuit_id} not found"
542
            ) from KeyError
543 1
        if evc.is_enabled():
544 1
            with evc.lock:
545 1
                evc.remove_current_flows()
546 1
                evc.deploy()
547 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
548 1
            status = 202
549
        else:
550 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
551 1
            status = 409
552
553 1
        return JSONResponse(result, status_code=status)
554
555 1
    @rest("/v2/evc/schedule/", methods=["POST"])
556 1
    @validate_openapi(spec)
557 1
    def create_schedule(self, request: Request) -> JSONResponse:
558
        """
559
        Create a new schedule for a given circuit.
560
561
        This service do no check if there are conflicts with another schedule.
562
        Payload example:
563
            {
564
              "circuit_id":"aa:bb:cc",
565
              "schedule": {
566
                "date": "2019-08-07T14:52:10.967Z",
567
                "interval": "string",
568
                "frequency": "1 * * * *",
569
                "action": "create"
570
              }
571
            }
572
        """
573 1
        log.debug("create_schedule /v2/evc/schedule/")
574 1
        data = get_json_or_400(request, self.controller.loop)
575 1
        circuit_id = data["circuit_id"]
576 1
        schedule_data = data["schedule"]
577
578
        # Get EVC from circuits buffer
579 1
        circuits = self._get_circuits_buffer()
580
581
        # get the circuit
582 1
        evc = circuits.get(circuit_id)
583
584
        # get the circuit
585 1
        if not evc:
586 1
            result = f"circuit_id {circuit_id} not found"
587 1
            log.debug("create_schedule result %s %s", result, 404)
588 1
            raise HTTPException(404, detail=result)
589
        # Can not modify circuits deleted and archived
590 1
        if evc.archived:
591 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
592 1
            log.debug("create_schedule result %s %s", result, 409)
593 1
            raise HTTPException(409, detail=result)
594
595
        # new schedule from dict
596 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
597
598
        # If there is no schedule, create the list
599 1
        if not evc.circuit_scheduler:
600 1
            evc.circuit_scheduler = []
601
602
        # Add the new schedule
603 1
        evc.circuit_scheduler.append(new_schedule)
604
605
        # Add schedule job
606 1
        self.sched.add_circuit_job(evc, new_schedule)
607
608
        # save circuit to mongodb
609 1
        evc.sync()
610
611 1
        result = new_schedule.as_dict()
612 1
        status = 201
613
614 1
        log.debug("create_schedule result %s %s", result, status)
615 1
        return JSONResponse(result, status_code=status)
616
617 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
618 1
    @validate_openapi(spec)
619 1
    def update_schedule(self, request: Request) -> JSONResponse:
620
        """Update a schedule.
621
622
        Change all attributes from the given schedule from a EVC circuit.
623
        The schedule ID is preserved as default.
624
        Payload example:
625
            {
626
              "date": "2019-08-07T14:52:10.967Z",
627
              "interval": "string",
628
              "frequency": "1 * * *",
629
              "action": "create"
630
            }
631
        """
632 1
        data = get_json_or_400(request, self.controller.loop)
633 1
        schedule_id = request.path_params["schedule_id"]
634 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
635
636
        # Try to find a circuit schedule
637 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
638
639
        # Can not modify circuits deleted and archived
640 1
        if not found_schedule:
641 1
            result = f"schedule_id {schedule_id} not found"
642 1
            log.debug("update_schedule result %s %s", result, 404)
643 1
            raise HTTPException(404, detail=result)
644 1
        if evc.archived:
645 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
646 1
            log.debug("update_schedule result %s %s", result, 409)
647 1
            raise HTTPException(409, detail=result)
648
649 1
        new_schedule = CircuitSchedule.from_dict(data)
650 1
        new_schedule.id = found_schedule.id
651
        # Remove the old schedule
652 1
        evc.circuit_scheduler.remove(found_schedule)
653
        # Append the modified schedule
654 1
        evc.circuit_scheduler.append(new_schedule)
655
656
        # Cancel all schedule jobs
657 1
        self.sched.cancel_job(found_schedule.id)
658
        # Add the new circuit schedule
659 1
        self.sched.add_circuit_job(evc, new_schedule)
660
        # Save EVC to mongodb
661 1
        evc.sync()
662
663 1
        result = new_schedule.as_dict()
664 1
        status = 200
665
666 1
        log.debug("update_schedule result %s %s", result, status)
667 1
        return JSONResponse(result, status_code=status)
668
669 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
670 1
    def delete_schedule(self, request: Request) -> JSONResponse:
671
        """Remove a circuit schedule.
672
673
        Remove the Schedule from EVC.
674
        Remove the Schedule from cron job.
675
        Save the EVC to the Storehouse.
676
        """
677 1
        schedule_id = request.path_params["schedule_id"]
678 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
679 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
680
681
        # Can not modify circuits deleted and archived
682 1
        if not found_schedule:
683 1
            result = f"schedule_id {schedule_id} not found"
684 1
            log.debug("delete_schedule result %s %s", result, 404)
685 1
            raise HTTPException(404, detail=result)
686
687 1
        if evc.archived:
688 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
689 1
            log.debug("delete_schedule result %s %s", result, 409)
690 1
            raise HTTPException(409, detail=result)
691
692
        # Remove the old schedule
693 1
        evc.circuit_scheduler.remove(found_schedule)
694
695
        # Cancel all schedule jobs
696 1
        self.sched.cancel_job(found_schedule.id)
697
        # Save EVC to mongodb
698 1
        evc.sync()
699
700 1
        result = "Schedule removed"
701 1
        status = 200
702
703 1
        log.debug("delete_schedule result %s %s", result, status)
704 1
        return JSONResponse(result, status_code=status)
705
706 1
    def _is_duplicated_evc(self, evc):
707
        """Verify if the circuit given is duplicated with the stored evcs.
708
709
        Args:
710
            evc (EVC): circuit to be analysed.
711
712
        Returns:
713
            boolean: True if the circuit is duplicated, otherwise False.
714
715
        """
716 1
        for circuit in tuple(self.circuits.values()):
717 1
            if not circuit.archived and circuit.shares_uni(evc):
718 1
                return True
719 1
        return False
720
721 1
    @listen_to("kytos/topology.link_up")
722 1
    def on_link_up(self, event):
723
        """Change circuit when link is up or end_maintenance."""
724
        self.handle_link_up(event)
725
726 1
    def handle_link_up(self, event):
727
        """Change circuit when link is up or end_maintenance."""
728 1
        log.info("Event handle_link_up %s", event.content["link"])
729 1
        for evc in self.get_evcs_by_svc_level():
730 1
            if evc.is_enabled() and not evc.archived:
731 1
                with evc.lock:
732 1
                    evc.handle_link_up(event.content["link"])
733
734 1
    @listen_to("kytos/topology.updated")
735 1
    def on_topology_update(self, event):
736
        """Capture topology update event"""
737
        self.handle_topology_update(event)
738
739 1
    def handle_topology_update(self, event):
740
        """Handle topology update"""
741 1
        with self._lock:
742 1
            if (
743
                self._topology_updated_at
744
                and self._topology_updated_at > event.timestamp
745
            ):
746
                return
747 1
            self._topology_updated_at = event.timestamp
748 1
            for evc in self.get_evcs_by_svc_level():
749 1
                if evc.is_enabled() and not evc.archived:
750 1
                    with evc.lock:
751 1
                        evc.handle_topology_update(
752
                            event.content["topology"].switches
753
                        )
754
755 1
    @listen_to("kytos/topology.link_down")
756 1
    def on_link_down(self, event):
757
        """Change circuit when link is down or under_mantenance."""
758
        self.handle_link_down(event)
759
760 1
    def handle_link_down(self, event):
761
        """Change circuit when link is down or under_mantenance."""
762 1
        link = event.content["link"]
763 1
        log.info("Event handle_link_down %s", link)
764 1
        switch_flows = {}
765 1
        evcs_with_failover = []
766 1
        evcs_normal = []
767 1
        check_failover = []
768 1
        for evc in self.get_evcs_by_svc_level():
769 1
            if evc.is_affected_by_link(link):
770
                # if there is no failover path, handles link down the
771
                # tradditional way
772 1
                if (
773
                    not getattr(evc, 'failover_path', None) or
774
                    evc.is_failover_path_affected_by_link(link)
775
                ):
776 1
                    evcs_normal.append(evc)
777 1
                    continue
778 1
                for dpid, flows in evc.get_failover_flows().items():
779 1
                    switch_flows.setdefault(dpid, [])
780 1
                    switch_flows[dpid].extend(flows)
781 1
                evcs_with_failover.append(evc)
782
            else:
783 1
                check_failover.append(evc)
784
785 1
        while switch_flows:
786 1
            offset = settings.BATCH_SIZE or None
787 1
            switches = list(switch_flows.keys())
788 1
            for dpid in switches:
789 1
                emit_event(
790
                    self.controller,
791
                    context="kytos.flow_manager",
792
                    name="flows.install",
793
                    content={
794
                        "dpid": dpid,
795
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
796
                    }
797
                )
798 1
                if offset is None or offset >= len(switch_flows[dpid]):
799 1
                    del switch_flows[dpid]
800 1
                    continue
801 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
802 1
            time.sleep(settings.BATCH_INTERVAL)
803
804 1
        for evc in evcs_with_failover:
805 1
            with evc.lock:
806 1
                old_path = evc.current_path
807 1
                evc.current_path = evc.failover_path
808 1
                evc.failover_path = old_path
809 1
                evc.sync()
810 1
            emit_event(self.controller, "redeployed_link_down",
811
                       content=map_evc_event_content(evc))
812 1
            log.info(
813
                f"{evc} redeployed with failover due to link down {link.id}"
814
            )
815
816 1
        for evc in evcs_normal:
817 1
            emit_event(
818
                self.controller,
819
                "evc_affected_by_link_down",
820
                content={"link_id": link.id} | map_evc_event_content(evc)
821
            )
822
823
        # After handling the hot path, check if new failover paths are needed.
824
        # Note that EVCs affected by link down will generate a KytosEvent for
825
        # deployed|redeployed, which will trigger the failover path setup.
826
        # Thus, we just need to further check the check_failover list
827 1
        for evc in check_failover:
828 1
            if evc.is_failover_path_affected_by_link(link):
829 1
                evc.setup_failover_path()
830
831 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
832 1
    def on_evc_affected_by_link_down(self, event):
833
        """Change circuit when link is down or under_mantenance."""
834
        self.handle_evc_affected_by_link_down(event)
835
836 1
    def handle_evc_affected_by_link_down(self, event):
837
        """Change circuit when link is down or under_mantenance."""
838 1
        evc = self.circuits.get(event.content["evc_id"])
839 1
        link_id = event.content['link_id']
840 1
        if not evc:
841 1
            return
842 1
        with evc.lock:
843 1
            result = evc.handle_link_down()
844 1
        event_name = "error_redeploy_link_down"
845 1
        if result:
846 1
            log.info(f"{evc} redeployed due to link down {link_id}")
847 1
            event_name = "redeployed_link_down"
848 1
        emit_event(self.controller, event_name,
849
                   content=map_evc_event_content(evc))
850
851 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
852 1
    def on_evc_deployed(self, event):
853
        """Handle EVC deployed|redeployed_link_down."""
854
        self.handle_evc_deployed(event)
855
856 1
    def handle_evc_deployed(self, event):
857
        """Setup failover path on evc deployed."""
858 1
        evc = self.circuits.get(event.content["evc_id"])
859 1
        if not evc:
860 1
            return
861 1
        with evc.lock:
862 1
            evc.setup_failover_path()
863
864 1
    @listen_to("kytos/topology.topology_loaded")
865 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
866
        """Load EVCs once the topology is available."""
867
        self.load_all_evcs()
868
869 1
    def load_all_evcs(self):
870
        """Try to load all EVCs on startup."""
871 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
872 1
        for circuit_id, circuit in circuits:
873 1
            if circuit_id not in self.circuits:
874 1
                self._load_evc(circuit)
875
876 1
    def _load_evc(self, circuit_dict):
877
        """Load one EVC from mongodb to memory."""
878 1
        try:
879 1
            evc = self._evc_from_dict(circuit_dict)
880 1
        except ValueError as exception:
881 1
            log.error(
882
                f"Could not load EVC: dict={circuit_dict} error={exception}"
883
            )
884 1
            return None
885
886 1
        if evc.archived:
887 1
            return None
888 1
        evc.deactivate()
889 1
        evc.sync()
890 1
        self.circuits.setdefault(evc.id, evc)
891 1
        self.sched.add(evc)
892 1
        return evc
893
894 1
    @listen_to("kytos/flow_manager.flow.error")
895 1
    def on_flow_mod_error(self, event):
896
        """Handle flow mod errors related to an EVC."""
897
        self.handle_flow_mod_error(event)
898
899 1
    def handle_flow_mod_error(self, event):
900
        """Handle flow mod errors related to an EVC."""
901 1
        flow = event.content["flow"]
902 1
        command = event.content.get("error_command")
903 1
        if command != "add":
904
            return
905 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
906 1
        if evc:
907 1
            evc.remove_current_flows()
908
909 1
    def _evc_dict_with_instances(self, evc_dict):
910
        """Convert some dict values to instance of EVC classes.
911
912
        This method will convert: [UNI, Link]
913
        """
914 1
        data = evc_dict.copy()  # Do not modify the original dict
915 1
        for attribute, value in data.items():
916
            # Get multiple attributes.
917
            # Ex: uni_a, uni_z
918 1
            if "uni" in attribute:
919 1
                try:
920 1
                    data[attribute] = self._uni_from_dict(value)
921 1
                except ValueError as exception:
922 1
                    result = "Error creating UNI: Invalid value"
923 1
                    raise ValueError(result) from exception
924
925 1
            if attribute == "circuit_scheduler":
926 1
                data[attribute] = []
927 1
                for schedule in value:
928 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
929
930
            # Get multiple attributes.
931
            # Ex: primary_links,
932
            #     backup_links,
933
            #     current_links_cache,
934
            #     primary_links_cache,
935
            #     backup_links_cache
936 1
            if "links" in attribute:
937 1
                data[attribute] = [
938
                    self._link_from_dict(link) for link in value
939
                ]
940
941
            # Ex: current_path,
942
            #     primary_path,
943
            #     backup_path
944 1
            if "path" in attribute and attribute != "dynamic_backup_path":
945 1
                data[attribute] = Path(
946
                    [self._link_from_dict(link) for link in value]
947
                )
948
949 1
        return data
950
951 1
    def _evc_from_dict(self, evc_dict):
952 1
        data = self._evc_dict_with_instances(evc_dict)
953 1
        data["table_group"] = self.table_group
954 1
        return EVC(self.controller, **data)
955
956 1
    def _uni_from_dict(self, uni_dict):
957
        """Return a UNI object from python dict."""
958 1
        if uni_dict is None:
959 1
            return False
960
961 1
        interface_id = uni_dict.get("interface_id")
962 1
        interface = self.controller.get_interface_by_id(interface_id)
963 1
        if interface is None:
964 1
            result = (
965
                "Error creating UNI:"
966
                + f"Could not instantiate interface {interface_id}"
967
            )
968 1
            raise ValueError(result) from ValueError
969 1
        tag_convert = {1: "vlan"}
970 1
        tag_dict = uni_dict.get("tag", None)
971 1
        if tag_dict:
972 1
            tag_type = tag_dict.get("tag_type")
973 1
            tag_type = tag_convert.get(tag_type, tag_type)
974 1
            tag_value = tag_dict.get("value")
975 1
            tag = TAG(tag_type, tag_value)
976
        else:
977 1
            tag = None
978 1
        uni = UNI(interface, tag)
979
980 1
        return uni
981
982 1
    def _link_from_dict(self, link_dict):
983
        """Return a Link object from python dict."""
984 1
        id_a = link_dict.get("endpoint_a").get("id")
985 1
        id_b = link_dict.get("endpoint_b").get("id")
986
987 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
988 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
989 1
        if not endpoint_a:
990 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
991 1
            raise ValueError(error_msg)
992 1
        if not endpoint_b:
993
            error_msg = f"Could not get interface endpoint_b id {id_b}"
994
            raise ValueError(error_msg)
995
996 1
        link = Link(endpoint_a, endpoint_b)
997 1
        if "metadata" in link_dict:
998 1
            link.extend_metadata(link_dict.get("metadata"))
999
1000 1
        s_vlan = link.get_metadata("s_vlan")
1001 1
        if s_vlan:
1002 1
            tag = TAG.from_dict(s_vlan)
1003 1
            if tag is False:
1004
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1005
                raise ValueError(error_msg)
1006 1
            link.update_metadata("s_vlan", tag)
1007 1
        return link
1008
1009 1
    def _find_evc_by_schedule_id(self, schedule_id):
1010
        """
1011
        Find an EVC and CircuitSchedule based on schedule_id.
1012
1013
        :param schedule_id: Schedule ID
1014
        :return: EVC and Schedule
1015
        """
1016 1
        circuits = self._get_circuits_buffer()
1017 1
        found_schedule = None
1018 1
        evc = None
1019
1020
        # pylint: disable=unused-variable
1021 1
        for c_id, circuit in circuits.items():
1022 1
            for schedule in circuit.circuit_scheduler:
1023 1
                if schedule.id == schedule_id:
1024 1
                    found_schedule = schedule
1025 1
                    evc = circuit
1026 1
                    break
1027 1
            if found_schedule:
1028 1
                break
1029 1
        return evc, found_schedule
1030
1031 1
    def _get_circuits_buffer(self):
1032
        """
1033
        Return the circuit buffer.
1034
1035
        If the buffer is empty, try to load data from mongodb.
1036
        """
1037 1
        if not self.circuits:
1038
            # Load circuits from mongodb to buffer
1039 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1040 1
            for c_id, circuit in circuits.items():
1041 1
                evc = self._evc_from_dict(circuit)
1042 1
                self.circuits[c_id] = evc
1043 1
        return self.circuits
1044
1045
    # pylint: disable=attribute-defined-outside-init
1046 1
    @alisten_to("kytos/of_multi_table.enable_table")
1047 1
    async def on_table_enabled(self, event):
1048
        """Handle a recently table enabled."""
1049 1
        table_group = event.content.get("mef_eline", None)
1050 1
        if not table_group:
1051
            return
1052 1
        for group in table_group:
1053 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1054 1
                log.error(f'The table group "{group}" is not allowed for '
1055
                          f'mef_eline. Allowed table groups are '
1056
                          f'{settings.TABLE_GROUP_ALLOWED}')
1057 1
                return
1058 1
        self.table_group.update(table_group)
1059 1
        content = {"group_table": self.table_group}
1060 1
        name = "kytos/mef_eline.enable_table"
1061
        await aemit_event(self.controller, name, content)
1062