Passed
Pull Request — master (#320)
by Vinicius
06:19
created

build.main.Main.get_circuit()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 2
dl 0
loc 13
ccs 12
cts 12
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, emit_event,
25
                                         map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64
65 1
    def get_evcs_by_svc_level(self) -> list:
66
        """Get circuits sorted by desc service level and asc creation_time.
67
68
        In the future, as more ops are offloaded it should be get from the DB.
69
        """
70 1
        return sorted(self.circuits.values(),
71
                      key=lambda x: (-x.service_level, x.creation_time))
72
73 1
    @staticmethod
74 1
    def get_eline_controller():
75
        """Return the ELineController instance."""
76
        return controllers.ELineController()
77
78 1
    def execute(self):
79
        """Execute once when the napp is running."""
80 1
        if self._lock.locked():
81 1
            return
82 1
        log.debug("Starting consistency routine")
83 1
        with self._lock:
84 1
            self.execute_consistency()
85 1
        log.debug("Finished consistency routine")
86
87 1
    @staticmethod
88 1
    def should_be_checked(circuit):
89
        "Verify if the circuit meets the necessary conditions to be checked"
90
        # pylint: disable=too-many-boolean-expressions
91 1
        if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
                # if a inter-switch EVC does not have current_path, it does not
98
                # make sense to run sdntrace on it
99
                and (circuit.is_intra_switch() or circuit.current_path)
100
                ):
101 1
            return True
102
        return False
103
104 1
    def execute_consistency(self):
105
        """Execute consistency routine."""
106 1
        circuits_to_check = []
107 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
108 1
        for circuit in self.get_evcs_by_svc_level():
109 1
            stored_circuits.pop(circuit.id, None)
110 1
            if self.should_be_checked(circuit):
111 1
                circuits_to_check.append(circuit)
112 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
113 1
        for circuit in circuits_to_check:
114 1
            is_checked = circuits_checked.get(circuit.id)
115 1
            if is_checked:
116 1
                circuit.execution_rounds = 0
117 1
                log.info(f"{circuit} enabled but inactive - activating")
118 1
                with circuit.lock:
119 1
                    circuit.activate()
120 1
                    circuit.sync()
121
            else:
122 1
                circuit.execution_rounds += 1
123 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
124 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
125 1
                    with circuit.lock:
126 1
                        circuit.deploy()
127 1
        for circuit_id in stored_circuits:
128 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
129 1
            self._load_evc(stored_circuits[circuit_id])
130
131 1
    def shutdown(self):
132
        """Execute when your napp is unloaded.
133
134
        If you have some cleanup procedure, insert it here.
135
        """
136
137 1
    @rest("/v2/evc/", methods=["GET"])
138 1
    def list_circuits(self, request: Request) -> JSONResponse:
139
        """Endpoint to return circuits stored.
140
141
        archive query arg if defined (not null) will be filtered
142
        accordingly, by default only non archived evcs will be listed
143
        """
144 1
        log.debug("list_circuits /v2/evc")
145 1
        args = request.query_params
146 1
        archived = args.get("archived", "false").lower()
147 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
148 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
149
                                                      metadata=args)
150 1
        circuits = circuits['circuits']
151 1
        return JSONResponse(circuits)
152
153 1
    @rest("/v2/evc/schedule", methods=["GET"])
154 1
    def list_schedules(self, _request: Request) -> JSONResponse:
155
        """Endpoint to return all schedules stored for all circuits.
156
157
        Return a JSON with the following template:
158
        [{"schedule_id": <schedule_id>,
159
         "circuit_id": <circuit_id>,
160
         "schedule": <schedule object>}]
161
        """
162 1
        log.debug("list_schedules /v2/evc/schedule")
163 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
164 1
        if not circuits:
165 1
            result = {}
166 1
            status = 200
167 1
            return JSONResponse(result, status_code=status)
168
169 1
        result = []
170 1
        status = 200
171 1
        for circuit in circuits:
172 1
            circuit_scheduler = circuit.get("circuit_scheduler")
173 1
            if circuit_scheduler:
174 1
                for scheduler in circuit_scheduler:
175 1
                    value = {
176
                        "schedule_id": scheduler.get("id"),
177
                        "circuit_id": circuit.get("id"),
178
                        "schedule": scheduler,
179
                    }
180 1
                    result.append(value)
181
182 1
        log.debug("list_schedules result %s %s", result, status)
183 1
        return JSONResponse(result, status_code=status)
184
185 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
186 1
    def get_circuit(self, request: Request) -> JSONResponse:
187
        """Endpoint to return a circuit based on id."""
188 1
        circuit_id = request.path_params["circuit_id"]
189 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
190 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
191 1
        if not circuit:
192 1
            result = f"circuit_id {circuit_id} not found"
193 1
            log.debug("get_circuit result %s %s", result, 404)
194 1
            raise HTTPException(404, detail=result)
195 1
        status = 200
196 1
        log.debug("get_circuit result %s %s", circuit, status)
197 1
        return JSONResponse(circuit, status_code=status)
198
199 1
    @rest("/v2/evc/", methods=["POST"])
200 1
    @validate_openapi(spec)
201 1
    def create_circuit(self, request: Request) -> JSONResponse:
202
        """Try to create a new circuit.
203
204
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
205
        UNI_Z's requested C-VID are available from the interfaces' pools. This
206
        is checked when creating the UNI object.
207
208
        Then, E-Line NApp requests a primary and a backup path to the
209
        Pathfinder NApp using the attributes primary_links and backup_links
210
        submitted via REST
211
212
        # For each link composing paths in #3:
213
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
214
        #  - Using the S-VID obtained, generate abstract flow entries to be
215
        #    sent to FlowManager
216
217
        Push abstract flow entries to FlowManager and FlowManager pushes
218
        OpenFlow entries to datapaths
219
220
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
221
        creation
222
223
        Finnaly, notify user of the status of its request.
224
        """
225
        # Try to create the circuit object
226 1
        log.debug("create_circuit /v2/evc/")
227 1
        data = get_json_or_400(request, self.controller.loop)
228
229 1
        try:
230 1
            evc = self._evc_from_dict(data)
231 1
        except ValueError as exception:
232 1
            log.debug("create_circuit result %s %s", exception, 400)
233 1
            raise HTTPException(400, detail=str(exception)) from exception
234
235 1
        if evc.primary_path:
236
            try:
237
                evc.primary_path.is_valid(
238
                    evc.uni_a.interface.switch,
239
                    evc.uni_z.interface.switch,
240
                    bool(evc.circuit_scheduler),
241
                )
242
            except InvalidPath as exception:
243
                raise HTTPException(
244
                    400,
245
                    detail=f"primary_path is not valid: {exception}"
246
                ) from exception
247 1
        if evc.backup_path:
248
            try:
249
                evc.backup_path.is_valid(
250
                    evc.uni_a.interface.switch,
251
                    evc.uni_z.interface.switch,
252
                    bool(evc.circuit_scheduler),
253
                )
254
            except InvalidPath as exception:
255
                raise HTTPException(
256
                    400,
257
                    detail=f"backup_path is not valid: {exception}"
258
                ) from exception
259
260
        # verify duplicated evc
261 1
        if self._is_duplicated_evc(evc):
262 1
            result = "The EVC already exists."
263 1
            log.debug("create_circuit result %s %s", result, 409)
264 1
            raise HTTPException(409, detail=result)
265
266 1
        try:
267 1
            evc._validate_has_primary_or_dynamic()
268 1
        except ValueError as exception:
269 1
            raise HTTPException(400, detail=str(exception)) from exception
270
271
        # save circuit
272 1
        try:
273 1
            evc.sync()
274
        except ValidationError as exception:
275
            raise HTTPException(400, detail=str(exception)) from exception
276
277
        # store circuit in dictionary
278 1
        self.circuits[evc.id] = evc
279
280
        # Schedule the circuit deploy
281 1
        self.sched.add(evc)
282
283
        # Circuit has no schedule, deploy now
284 1
        if not evc.circuit_scheduler:
285 1
            with evc.lock:
286 1
                evc.deploy()
287
288
        # Notify users
289 1
        result = {"circuit_id": evc.id}
290 1
        status = 201
291 1
        log.debug("create_circuit result %s %s", result, status)
292 1
        emit_event(self.controller, name="created",
293
                   content=map_evc_event_content(evc))
294 1
        return JSONResponse(result, status_code=status)
295
296 1
    @listen_to('kytos/flow_manager.flow.removed')
297 1
    def on_flow_delete(self, event):
298
        """Capture delete messages to keep track when flows got removed."""
299
        self.handle_flow_delete(event)
300
301 1
    def handle_flow_delete(self, event):
302
        """Keep track when the EVC got flows removed by deriving its cookie."""
303 1
        flow = event.content["flow"]
304 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
305 1
        if evc:
306 1
            log.debug("Flow removed in EVC %s", evc.id)
307 1
            evc.set_flow_removed_at()
308
309 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
310 1
    @validate_openapi(spec)
311 1
    def update(self, request: Request) -> JSONResponse:
312
        """Update a circuit based on payload.
313
314
        The EVC attributes (creation_time, active, current_path,
315
        failover_path, _id, archived) can't be updated.
316
        """
317 1
        data = get_json_or_400(request, self.controller.loop)
318 1
        circuit_id = request.path_params["circuit_id"]
319 1
        log.debug("update /v2/evc/%s", circuit_id)
320 1
        try:
321 1
            evc = self.circuits[circuit_id]
322 1
        except KeyError:
323 1
            result = f"circuit_id {circuit_id} not found"
324 1
            log.debug("update result %s %s", result, 404)
325 1
            raise HTTPException(404, detail=result) from KeyError
326
327 1
        if evc.archived:
328 1
            result = "Can't update archived EVC"
329 1
            log.debug("update result %s %s", result, 409)
330 1
            raise HTTPException(409, detail=result)
331
332 1
        try:
333 1
            enable, redeploy = evc.update(
334
                **self._evc_dict_with_instances(data)
335
            )
336 1
        except ValidationError as exception:
337
            raise HTTPException(400, detail=str(exception)) from exception
338 1
        except ValueError as exception:
339 1
            log.error(exception)
340 1
            log.debug("update result %s %s", exception, 400)
341 1
            raise HTTPException(400, detail=str(exception)) from exception
342
343 1
        if evc.is_active():
344
            if enable is False:  # disable if active
345
                with evc.lock:
346
                    evc.remove()
347
            elif redeploy is not None:  # redeploy if active
348
                with evc.lock:
349
                    evc.remove()
350
                    evc.deploy()
351
        else:
352 1
            if enable is True:  # enable if inactive
353 1
                with evc.lock:
354 1
                    evc.deploy()
355 1
        result = {evc.id: evc.as_dict()}
356 1
        status = 200
357
358 1
        log.debug("update result %s %s", result, status)
359 1
        emit_event(self.controller, "updated",
360
                   content=map_evc_event_content(evc, **data))
361 1
        return JSONResponse(result, status_code=status)
362
363 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
364 1
    def delete_circuit(self, request: Request) -> JSONResponse:
365
        """Remove a circuit.
366
367
        First, the flows are removed from the switches, and then the EVC is
368
        disabled.
369
        """
370 1
        circuit_id = request.path_params["circuit_id"]
371 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
372 1
        try:
373 1
            evc = self.circuits[circuit_id]
374 1
        except KeyError:
375 1
            result = f"circuit_id {circuit_id} not found"
376 1
            log.debug("delete_circuit result %s %s", result, 404)
377 1
            raise HTTPException(404, detail=result) from KeyError
378
379 1
        if evc.archived:
380 1
            result = f"Circuit {circuit_id} already removed"
381 1
            log.debug("delete_circuit result %s %s", result, 404)
382 1
            raise HTTPException(404, detail=result)
383
384 1
        log.info("Removing %s", evc)
385 1
        with evc.lock:
386 1
            evc.remove_current_flows()
387 1
            evc.remove_failover_flows(sync=False)
388 1
            evc.deactivate()
389 1
            evc.disable()
390 1
            self.sched.remove(evc)
391 1
            evc.archive()
392 1
            evc.sync()
393 1
        log.info("EVC removed. %s", evc)
394 1
        result = {"response": f"Circuit {circuit_id} removed"}
395 1
        status = 200
396
397 1
        log.debug("delete_circuit result %s %s", result, status)
398 1
        emit_event(self.controller, "deleted",
399
                   content=map_evc_event_content(evc))
400 1
        return JSONResponse(result, status_code=status)
401
402 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
403 1
    def get_metadata(self, request: Request) -> JSONResponse:
404
        """Get metadata from an EVC."""
405 1
        circuit_id = request.path_params["circuit_id"]
406 1
        try:
407 1
            return (
408
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
409
            )
410
        except KeyError as error:
411
            raise HTTPException(
412
                404,
413
                detail=f"circuit_id {circuit_id} not found."
414
            ) from error
415
416 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
417 1
    @validate_openapi(spec)
418 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
419
        """Add metadata to a bulk of EVCs."""
420 1
        data = get_json_or_400(request, self.controller.loop)
421 1
        circuit_ids = data.pop("circuit_ids")
422
423 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
424
425 1
        fail_evcs = []
426 1
        for _id in circuit_ids:
427 1
            try:
428 1
                evc = self.circuits[_id]
429 1
                evc.extend_metadata(data)
430 1
            except KeyError:
431 1
                fail_evcs.append(_id)
432
433 1
        if fail_evcs:
434 1
            raise HTTPException(404, detail=fail_evcs)
435 1
        return JSONResponse("Operation successful", status_code=201)
436
437 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
438 1
    @validate_openapi(spec)
439 1
    def add_metadata(self, request: Request) -> JSONResponse:
440
        """Add metadata to an EVC."""
441 1
        circuit_id = request.path_params["circuit_id"]
442 1
        metadata = get_json_or_400(request, self.controller.loop)
443 1
        if not isinstance(metadata, dict):
444
            raise HTTPException(400, "Invalid metadata value: {metadata}")
445 1
        try:
446 1
            evc = self.circuits[circuit_id]
447 1
        except KeyError as error:
448 1
            raise HTTPException(
449
                404,
450
                detail=f"circuit_id {circuit_id} not found."
451
            ) from error
452
453 1
        evc.extend_metadata(metadata)
454 1
        evc.sync()
455 1
        return JSONResponse("Operation successful", status_code=201)
456
457 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458 1
    @validate_openapi(spec)
459 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
460
        """Delete metada from a bulk of EVCs"""
461 1
        data = get_json_or_400(request, self.controller.loop)
462 1
        key = request.path_params["key"]
463 1
        circuit_ids = data.pop("circuit_ids")
464 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
465
466 1
        fail_evcs = []
467 1
        for _id in circuit_ids:
468 1
            try:
469 1
                evc = self.circuits[_id]
470 1
                evc.remove_metadata(key)
471
            except KeyError:
472
                fail_evcs.append(_id)
473
474 1
        if fail_evcs:
475
            raise HTTPException(404, detail=fail_evcs)
476 1
        return JSONResponse("Operation successful")
477
478 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
479 1
    def delete_metadata(self, request: Request) -> JSONResponse:
480
        """Delete metadata from an EVC."""
481 1
        circuit_id = request.path_params["circuit_id"]
482 1
        key = request.path_params["key"]
483 1
        try:
484 1
            evc = self.circuits[circuit_id]
485 1
        except KeyError as error:
486 1
            raise HTTPException(
487
                404,
488
                detail=f"circuit_id {circuit_id} not found."
489
            ) from error
490
491 1
        evc.remove_metadata(key)
492 1
        evc.sync()
493 1
        return JSONResponse("Operation successful")
494
495 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
496 1
    def redeploy(self, request: Request) -> JSONResponse:
497
        """Endpoint to force the redeployment of an EVC."""
498 1
        circuit_id = request.path_params["circuit_id"]
499 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError:
503 1
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found"
506
            ) from KeyError
507 1
        if evc.is_enabled():
508 1
            with evc.lock:
509 1
                evc.remove_current_flows()
510 1
                evc.deploy()
511 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
512 1
            status = 202
513
        else:
514 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
515 1
            status = 409
516
517 1
        return JSONResponse(result, status_code=status)
518
519 1
    @rest("/v2/evc/schedule/", methods=["POST"])
520 1
    @validate_openapi(spec)
521 1
    def create_schedule(self, request: Request) -> JSONResponse:
522
        """
523
        Create a new schedule for a given circuit.
524
525
        This service do no check if there are conflicts with another schedule.
526
        Payload example:
527
            {
528
              "circuit_id":"aa:bb:cc",
529
              "schedule": {
530
                "date": "2019-08-07T14:52:10.967Z",
531
                "interval": "string",
532
                "frequency": "1 * * * *",
533
                "action": "create"
534
              }
535
            }
536
        """
537 1
        log.debug("create_schedule /v2/evc/schedule/")
538 1
        data = get_json_or_400(request, self.controller.loop)
539 1
        circuit_id = data["circuit_id"]
540 1
        schedule_data = data["schedule"]
541
542
        # Get EVC from circuits buffer
543 1
        circuits = self._get_circuits_buffer()
544
545
        # get the circuit
546 1
        evc = circuits.get(circuit_id)
547
548
        # get the circuit
549 1
        if not evc:
550 1
            result = f"circuit_id {circuit_id} not found"
551 1
            log.debug("create_schedule result %s %s", result, 404)
552 1
            raise HTTPException(404, detail=result)
553
        # Can not modify circuits deleted and archived
554 1
        if evc.archived:
555 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
556 1
            log.debug("create_schedule result %s %s", result, 409)
557 1
            raise HTTPException(409, detail=result)
558
559
        # new schedule from dict
560 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
561
562
        # If there is no schedule, create the list
563 1
        if not evc.circuit_scheduler:
564 1
            evc.circuit_scheduler = []
565
566
        # Add the new schedule
567 1
        evc.circuit_scheduler.append(new_schedule)
568
569
        # Add schedule job
570 1
        self.sched.add_circuit_job(evc, new_schedule)
571
572
        # save circuit to mongodb
573 1
        evc.sync()
574
575 1
        result = new_schedule.as_dict()
576 1
        status = 201
577
578 1
        log.debug("create_schedule result %s %s", result, status)
579 1
        return JSONResponse(result, status_code=status)
580
581 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
582 1
    @validate_openapi(spec)
583 1
    def update_schedule(self, request: Request) -> JSONResponse:
584
        """Update a schedule.
585
586
        Change all attributes from the given schedule from a EVC circuit.
587
        The schedule ID is preserved as default.
588
        Payload example:
589
            {
590
              "date": "2019-08-07T14:52:10.967Z",
591
              "interval": "string",
592
              "frequency": "1 * * *",
593
              "action": "create"
594
            }
595
        """
596 1
        data = get_json_or_400(request, self.controller.loop)
597 1
        schedule_id = request.path_params["schedule_id"]
598 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
599
600
        # Try to find a circuit schedule
601 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
602
603
        # Can not modify circuits deleted and archived
604 1
        if not found_schedule:
605 1
            result = f"schedule_id {schedule_id} not found"
606 1
            log.debug("update_schedule result %s %s", result, 404)
607 1
            raise HTTPException(404, detail=result)
608 1
        if evc.archived:
609 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
610 1
            log.debug("update_schedule result %s %s", result, 409)
611 1
            raise HTTPException(409, detail=result)
612
613 1
        new_schedule = CircuitSchedule.from_dict(data)
614 1
        new_schedule.id = found_schedule.id
615
        # Remove the old schedule
616 1
        evc.circuit_scheduler.remove(found_schedule)
617
        # Append the modified schedule
618 1
        evc.circuit_scheduler.append(new_schedule)
619
620
        # Cancel all schedule jobs
621 1
        self.sched.cancel_job(found_schedule.id)
622
        # Add the new circuit schedule
623 1
        self.sched.add_circuit_job(evc, new_schedule)
624
        # Save EVC to mongodb
625 1
        evc.sync()
626
627 1
        result = new_schedule.as_dict()
628 1
        status = 200
629
630 1
        log.debug("update_schedule result %s %s", result, status)
631 1
        return JSONResponse(result, status_code=status)
632
633 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
634 1
    def delete_schedule(self, request: Request) -> JSONResponse:
635
        """Remove a circuit schedule.
636
637
        Remove the Schedule from EVC.
638
        Remove the Schedule from cron job.
639
        Save the EVC to the Storehouse.
640
        """
641 1
        schedule_id = request.path_params["schedule_id"]
642 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
643 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
644
645
        # Can not modify circuits deleted and archived
646 1
        if not found_schedule:
647 1
            result = f"schedule_id {schedule_id} not found"
648 1
            log.debug("delete_schedule result %s %s", result, 404)
649 1
            raise HTTPException(404, detail=result)
650
651 1
        if evc.archived:
652 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653 1
            log.debug("delete_schedule result %s %s", result, 409)
654 1
            raise HTTPException(409, detail=result)
655
656
        # Remove the old schedule
657 1
        evc.circuit_scheduler.remove(found_schedule)
658
659
        # Cancel all schedule jobs
660 1
        self.sched.cancel_job(found_schedule.id)
661
        # Save EVC to mongodb
662 1
        evc.sync()
663
664 1
        result = "Schedule removed"
665 1
        status = 200
666
667 1
        log.debug("delete_schedule result %s %s", result, status)
668 1
        return JSONResponse(result, status_code=status)
669
670 1
    def _is_duplicated_evc(self, evc):
671
        """Verify if the circuit given is duplicated with the stored evcs.
672
673
        Args:
674
            evc (EVC): circuit to be analysed.
675
676
        Returns:
677
            boolean: True if the circuit is duplicated, otherwise False.
678
679
        """
680 1
        for circuit in tuple(self.circuits.values()):
681 1
            if not circuit.archived and circuit.shares_uni(evc):
682 1
                return True
683 1
        return False
684
685 1
    @listen_to("kytos/topology.link_up")
686 1
    def on_link_up(self, event):
687
        """Change circuit when link is up or end_maintenance."""
688
        self.handle_link_up(event)
689
690 1
    def handle_link_up(self, event):
691
        """Change circuit when link is up or end_maintenance."""
692 1
        log.info("Event handle_link_up %s", event.content["link"])
693 1
        for evc in self.get_evcs_by_svc_level():
694 1
            if evc.is_enabled() and not evc.archived:
695 1
                with evc.lock:
696 1
                    evc.handle_link_up(event.content["link"])
697
698 1
    @listen_to("kytos/topology.link_down")
699 1
    def on_link_down(self, event):
700
        """Change circuit when link is down or under_mantenance."""
701
        self.handle_link_down(event)
702
703 1
    def handle_link_down(self, event):
704
        """Change circuit when link is down or under_mantenance."""
705 1
        link = event.content["link"]
706 1
        log.info("Event handle_link_down %s", link)
707 1
        switch_flows = {}
708 1
        evcs_with_failover = []
709 1
        evcs_normal = []
710 1
        check_failover = []
711 1
        for evc in self.get_evcs_by_svc_level():
712 1
            if evc.is_affected_by_link(link):
713
                # if there is no failover path, handles link down the
714
                # tradditional way
715 1
                if (
716
                    not getattr(evc, 'failover_path', None) or
717
                    evc.is_failover_path_affected_by_link(link)
718
                ):
719 1
                    evcs_normal.append(evc)
720 1
                    continue
721 1
                for dpid, flows in evc.get_failover_flows().items():
722 1
                    switch_flows.setdefault(dpid, [])
723 1
                    switch_flows[dpid].extend(flows)
724 1
                evcs_with_failover.append(evc)
725
            else:
726 1
                check_failover.append(evc)
727
728 1
        offset = 0
729 1
        while switch_flows:
730 1
            offset = (offset + settings.BATCH_SIZE) or None
731 1
            switches = list(switch_flows.keys())
732 1
            for dpid in switches:
733 1
                emit_event(
734
                    self.controller,
735
                    context="kytos.flow_manager",
736
                    name="flows.install",
737
                    content={
738
                        "dpid": dpid,
739
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
740
                    }
741
                )
742 1
                if offset is None or offset >= len(switch_flows[dpid]):
743 1
                    del switch_flows[dpid]
744 1
                    continue
745 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
746 1
            time.sleep(settings.BATCH_INTERVAL)
747
748 1
        for evc in evcs_with_failover:
749 1
            with evc.lock:
750 1
                old_path = evc.current_path
751 1
                evc.current_path = evc.failover_path
752 1
                evc.failover_path = old_path
753 1
                evc.sync()
754 1
            emit_event(self.controller, "redeployed_link_down",
755
                       content=map_evc_event_content(evc))
756 1
            log.info(
757
                f"{evc} redeployed with failover due to link down {link.id}"
758
            )
759
760 1
        for evc in evcs_normal:
761 1
            emit_event(
762
                self.controller,
763
                "evc_affected_by_link_down",
764
                content={"link_id": link.id} | map_evc_event_content(evc)
765
            )
766
767
        # After handling the hot path, check if new failover paths are needed.
768
        # Note that EVCs affected by link down will generate a KytosEvent for
769
        # deployed|redeployed, which will trigger the failover path setup.
770
        # Thus, we just need to further check the check_failover list
771 1
        for evc in check_failover:
772 1
            if evc.is_failover_path_affected_by_link(link):
773 1
                evc.setup_failover_path()
774
775 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
776 1
    def on_evc_affected_by_link_down(self, event):
777
        """Change circuit when link is down or under_mantenance."""
778
        self.handle_evc_affected_by_link_down(event)
779
780 1
    def handle_evc_affected_by_link_down(self, event):
781
        """Change circuit when link is down or under_mantenance."""
782 1
        evc = self.circuits.get(event.content["evc_id"])
783 1
        link_id = event.content['link_id']
784 1
        if not evc:
785 1
            return
786 1
        with evc.lock:
787 1
            result = evc.handle_link_down()
788 1
        event_name = "error_redeploy_link_down"
789 1
        if result:
790 1
            log.info(f"{evc} redeployed due to link down {link_id}")
791 1
            event_name = "redeployed_link_down"
792 1
        emit_event(self.controller, event_name,
793
                   content=map_evc_event_content(evc))
794
795 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
796 1
    def on_evc_deployed(self, event):
797
        """Handle EVC deployed|redeployed_link_down."""
798
        self.handle_evc_deployed(event)
799
800 1
    def handle_evc_deployed(self, event):
801
        """Setup failover path on evc deployed."""
802 1
        evc = self.circuits.get(event.content["evc_id"])
803 1
        if not evc:
804 1
            return
805 1
        with evc.lock:
806 1
            evc.setup_failover_path()
807
808 1
    @listen_to("kytos/topology.topology_loaded")
809 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
810
        """Load EVCs once the topology is available."""
811
        self.load_all_evcs()
812
813 1
    def load_all_evcs(self):
814
        """Try to load all EVCs on startup."""
815 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
816 1
        for circuit_id, circuit in circuits:
817 1
            if circuit_id not in self.circuits:
818 1
                self._load_evc(circuit)
819
820 1
    def _load_evc(self, circuit_dict):
821
        """Load one EVC from mongodb to memory."""
822 1
        try:
823 1
            evc = self._evc_from_dict(circuit_dict)
824 1
        except ValueError as exception:
825 1
            log.error(
826
                f"Could not load EVC: dict={circuit_dict} error={exception}"
827
            )
828 1
            return None
829
830 1
        if evc.archived:
831 1
            return None
832 1
        evc.deactivate()
833 1
        evc.sync()
834 1
        self.circuits.setdefault(evc.id, evc)
835 1
        self.sched.add(evc)
836 1
        return evc
837
838 1
    @listen_to("kytos/flow_manager.flow.error")
839 1
    def on_flow_mod_error(self, event):
840
        """Handle flow mod errors related to an EVC."""
841
        self.handle_flow_mod_error(event)
842
843 1
    def handle_flow_mod_error(self, event):
844
        """Handle flow mod errors related to an EVC."""
845 1
        flow = event.content["flow"]
846 1
        command = event.content.get("error_command")
847 1
        if command != "add":
848
            return
849 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
850 1
        if evc:
851 1
            evc.remove_current_flows()
852
853 1
    def _evc_dict_with_instances(self, evc_dict):
854
        """Convert some dict values to instance of EVC classes.
855
856
        This method will convert: [UNI, Link]
857
        """
858 1
        data = evc_dict.copy()  # Do not modify the original dict
859 1
        for attribute, value in data.items():
860
            # Get multiple attributes.
861
            # Ex: uni_a, uni_z
862 1
            if "uni" in attribute:
863 1
                try:
864 1
                    data[attribute] = self._uni_from_dict(value)
865 1
                except ValueError as exception:
866 1
                    result = "Error creating UNI: Invalid value"
867 1
                    raise ValueError(result) from exception
868
869 1
            if attribute == "circuit_scheduler":
870 1
                data[attribute] = []
871 1
                for schedule in value:
872 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
873
874
            # Get multiple attributes.
875
            # Ex: primary_links,
876
            #     backup_links,
877
            #     current_links_cache,
878
            #     primary_links_cache,
879
            #     backup_links_cache
880 1
            if "links" in attribute:
881 1
                data[attribute] = [
882
                    self._link_from_dict(link) for link in value
883
                ]
884
885
            # Ex: current_path,
886
            #     primary_path,
887
            #     backup_path
888 1
            if "path" in attribute and attribute != "dynamic_backup_path":
889 1
                data[attribute] = Path(
890
                    [self._link_from_dict(link) for link in value]
891
                )
892
893 1
        return data
894
895 1
    def _evc_from_dict(self, evc_dict):
896 1
        data = self._evc_dict_with_instances(evc_dict)
897 1
        data["table_group"] = self.table_group
898 1
        return EVC(self.controller, **data)
899
900 1
    def _uni_from_dict(self, uni_dict):
901
        """Return a UNI object from python dict."""
902 1
        if uni_dict is None:
903 1
            return False
904
905 1
        interface_id = uni_dict.get("interface_id")
906 1
        interface = self.controller.get_interface_by_id(interface_id)
907 1
        if interface is None:
908 1
            result = (
909
                "Error creating UNI:"
910
                + f"Could not instantiate interface {interface_id}"
911
            )
912 1
            raise ValueError(result) from ValueError
913
914 1
        tag_dict = uni_dict.get("tag", None)
915 1
        if tag_dict:
916 1
            tag = TAG.from_dict(tag_dict)
917
        else:
918 1
            tag = None
919 1
        uni = UNI(interface, tag)
920
921 1
        return uni
922
923 1
    def _link_from_dict(self, link_dict):
924
        """Return a Link object from python dict."""
925 1
        id_a = link_dict.get("endpoint_a").get("id")
926 1
        id_b = link_dict.get("endpoint_b").get("id")
927
928 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
929 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
930 1
        if not endpoint_a:
931 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
932 1
            raise ValueError(error_msg)
933 1
        if not endpoint_b:
934
            error_msg = f"Could not get interface endpoint_b id {id_b}"
935
            raise ValueError(error_msg)
936
937 1
        link = Link(endpoint_a, endpoint_b)
938 1
        if "metadata" in link_dict:
939 1
            link.extend_metadata(link_dict.get("metadata"))
940
941 1
        s_vlan = link.get_metadata("s_vlan")
942 1
        if s_vlan:
943 1
            tag = TAG.from_dict(s_vlan)
944 1
            if tag is False:
945
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
946
                raise ValueError(error_msg)
947 1
            link.update_metadata("s_vlan", tag)
948 1
        return link
949
950 1
    def _find_evc_by_schedule_id(self, schedule_id):
951
        """
952
        Find an EVC and CircuitSchedule based on schedule_id.
953
954
        :param schedule_id: Schedule ID
955
        :return: EVC and Schedule
956
        """
957 1
        circuits = self._get_circuits_buffer()
958 1
        found_schedule = None
959 1
        evc = None
960
961
        # pylint: disable=unused-variable
962 1
        for c_id, circuit in circuits.items():
963 1
            for schedule in circuit.circuit_scheduler:
964 1
                if schedule.id == schedule_id:
965 1
                    found_schedule = schedule
966 1
                    evc = circuit
967 1
                    break
968 1
            if found_schedule:
969 1
                break
970 1
        return evc, found_schedule
971
972 1
    def _get_circuits_buffer(self):
973
        """
974
        Return the circuit buffer.
975
976
        If the buffer is empty, try to load data from mongodb.
977
        """
978 1
        if not self.circuits:
979
            # Load circuits from mongodb to buffer
980 1
            circuits = self.mongo_controller.get_circuits()['circuits']
981 1
            for c_id, circuit in circuits.items():
982 1
                evc = self._evc_from_dict(circuit)
983 1
                self.circuits[c_id] = evc
984 1
        return self.circuits
985
986
    # pylint: disable=attribute-defined-outside-init
987 1
    @alisten_to("kytos/of_multi_table.enable_table")
988 1
    async def on_table_enabled(self, event):
989
        """Handle a recently table enabled."""
990 1
        table_group = event.content.get("mef_eline", None)
991 1
        if not table_group:
992
            return
993 1
        for group in table_group:
994 1
            if group not in settings.TABLE_GROUP_ALLOWED:
995 1
                log.error(f'The table group "{group}" is not allowed for '
996
                          f'mef_eline. Allowed table groups are '
997
                          f'{settings.TABLE_GROUP_ALLOWED}')
998 1
                return
999 1
        self.table_group.update(table_group)
1000 1
        content = {"group_table": self.table_group}
1001 1
        name = "kytos/mef_eline.enable_table"
1002
        await aemit_event(self.controller, name, content)
1003