Passed
Pull Request — master (#396)
by
unknown
03:43
created

build.main.Main._is_duplicated_evc()   A

Complexity

Conditions 5

Size

Total Lines 15
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 6
nop 2
dl 0
loc 15
ccs 5
cts 5
cp 1
crap 5
rs 9.3333
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26
                                          Path)
27 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31
32
33
# pylint: disable=too-many-public-methods
34 1
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37
    This class is the entry point for this napp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52
53
        # object to save and load circuits
54 1
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58 1
        DynamicPathManager.set_controller(self.controller)
59
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63
64 1
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69 1
        self._topology_updated_at = None
70
71 1
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77
                      key=lambda x: (-x.service_level, x.creation_time))
78
79 1
    @staticmethod
80 1
    def get_eline_controller():
81
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83
84 1
    def execute(self):
85
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89 1
        with self._lock:
90 1
            self.execute_consistency()
91 1
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96 1
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109
110 1
    def execute_consistency(self):
111
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124 1
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133 1
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135 1
            self._load_evc(stored_circuits[circuit_id])
136
137 1
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140
        If you have some cleanup procedure, insert it here.
141
        """
142
143 1
    @rest("/v2/evc/", methods=["GET"])
144 1
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147
        archive query arg if defined (not null) will be filtered
148
        accordingly, by default only non archived evcs will be listed
149
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152 1
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159 1
    @rest("/v2/evc/schedule", methods=["GET"])
160 1
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165
         "circuit_id": <circuit_id>,
166
         "schedule": <schedule object>}]
167
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171 1
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179 1
            if circuit_scheduler:
180 1
                for scheduler in circuit_scheduler:
181 1
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201 1
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204
205
    # pylint: disable=too-many-branches, too-many-statements
206 1
    @rest("/v2/evc/", methods=["POST"])
207 1
    @validate_openapi(spec)
208 1
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229
230
        Finnaly, notify user of the status of its request.
231
        """
232
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235
236 1
        try:
237 1
            evc = self._evc_from_dict(data)
238 1
        except ValueError as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        except KytosTagError as exception:
242 1
            log.debug("create_circuit result %s %s", exception, 400)
243 1
            raise HTTPException(400, detail=str(exception)) from exception
244 1
        try:
245 1
            check_disabled_component(evc.uni_a, evc.uni_z)
246 1
        except DisabledSwitch as exception:
247 1
            log.debug("create_circuit result %s %s", exception, 409)
248 1
            raise HTTPException(
249
                    409,
250
                    detail=f"Path is not valid: {exception}"
251
                ) from exception
252
253 1
        if evc.primary_path:
254 1
            try:
255 1
                evc.primary_path.is_valid(
256
                    evc.uni_a.interface.switch,
257
                    evc.uni_z.interface.switch,
258
                    bool(evc.circuit_scheduler),
259
                )
260 1
            except InvalidPath as exception:
261 1
                raise HTTPException(
262
                    400,
263
                    detail=f"primary_path is not valid: {exception}"
264
                ) from exception
265 1
        if evc.backup_path:
266 1
            try:
267 1
                evc.backup_path.is_valid(
268
                    evc.uni_a.interface.switch,
269
                    evc.uni_z.interface.switch,
270
                    bool(evc.circuit_scheduler),
271
                )
272 1
            except InvalidPath as exception:
273 1
                raise HTTPException(
274
                    400,
275
                    detail=f"backup_path is not valid: {exception}"
276
                ) from exception
277
278 1
        if self._is_duplicated_evc(evc):
279 1
            result = "The EVC already exists."
280 1
            log.debug("create_circuit result %s %s", result, 409)
281 1
            raise HTTPException(409, detail=result)
282
283 1
        if not evc._tag_lists_equal():
284 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
285 1
            raise HTTPException(400, detail=detail)
286
287 1
        try:
288 1
            evc._validate_has_primary_or_dynamic()
289 1
        except ValueError as exception:
290 1
            raise HTTPException(400, detail=str(exception)) from exception
291
292 1
        try:
293 1
            self._use_uni_tags(evc)
294
        except KytosTagError as exception:
295
            raise HTTPException(400, detail=str(exception)) from exception
296
297
        # save circuit
298 1
        try:
299 1
            evc.sync()
300
        except ValidationError as exception:
301
            raise HTTPException(400, detail=str(exception)) from exception
302
303
        # store circuit in dictionary
304 1
        self.circuits[evc.id] = evc
305
306
        # Schedule the circuit deploy
307 1
        self.sched.add(evc)
308
309
        # Circuit has no schedule, deploy now
310 1
        if not evc.circuit_scheduler:
311 1
            with evc.lock:
312 1
                evc.deploy()
313
314
        # Notify users
315 1
        result = {"circuit_id": evc.id}
316 1
        status = 201
317 1
        log.debug("create_circuit result %s %s", result, status)
318 1
        emit_event(self.controller, name="created",
319
                   content=map_evc_event_content(evc))
320 1
        return JSONResponse(result, status_code=status)
321
322 1
    @staticmethod
323 1
    def _use_uni_tags(evc):
324 1
        uni_a = evc.uni_a
325 1
        evc._use_uni_vlan(uni_a)
326 1
        try:
327 1
            uni_z = evc.uni_z
328 1
            evc._use_uni_vlan(uni_z)
329 1
        except KytosTagError as err:
330 1
            evc.make_uni_vlan_available(uni_a)
331 1
            raise err
332
333 1
    @listen_to('kytos/flow_manager.flow.removed')
334 1
    def on_flow_delete(self, event):
335
        """Capture delete messages to keep track when flows got removed."""
336
        self.handle_flow_delete(event)
337
338 1
    def handle_flow_delete(self, event):
339
        """Keep track when the EVC got flows removed by deriving its cookie."""
340 1
        flow = event.content["flow"]
341 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
342 1
        if evc:
343 1
            log.debug("Flow removed in EVC %s", evc.id)
344 1
            evc.set_flow_removed_at()
345
346 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
347 1
    @validate_openapi(spec)
348 1
    def update(self, request: Request) -> JSONResponse:
349
        """Update a circuit based on payload.
350
351
        The EVC attributes (creation_time, active, current_path,
352
        failover_path, _id, archived) can't be updated.
353
        """
354 1
        data = get_json_or_400(request, self.controller.loop)
355 1
        circuit_id = request.path_params["circuit_id"]
356 1
        log.debug("update /v2/evc/%s", circuit_id)
357 1
        try:
358 1
            evc = self.circuits[circuit_id]
359 1
        except KeyError:
360 1
            result = f"circuit_id {circuit_id} not found"
361 1
            log.debug("update result %s %s", result, 404)
362 1
            raise HTTPException(404, detail=result) from KeyError
363
364 1
        if evc.archived:
365 1
            result = "Can't update archived EVC"
366 1
            log.debug("update result %s %s", result, 409)
367 1
            raise HTTPException(409, detail=result)
368
369 1
        try:
370 1
            enable, redeploy = evc.update(
371
                **self._evc_dict_with_instances(data)
372
            )
373 1
        except KytosTagError as exception:
374
            raise HTTPException(400, detail=str(exception)) from exception
375 1
        except ValidationError as exception:
376
            raise HTTPException(400, detail=str(exception)) from exception
377 1
        except ValueError as exception:
378 1
            log.debug("update result %s %s", exception, 400)
379 1
            raise HTTPException(400, detail=str(exception)) from exception
380 1
        except DisabledSwitch as exception:
381 1
            log.debug("update result %s %s", exception, 409)
382 1
            raise HTTPException(
383
                    409,
384
                    detail=f"Path is not valid: {exception}"
385
                ) from exception
386
387 1
        if self._is_duplicated_evc(evc):
388
            result = "The EVC already exists."
389
            log.debug("create_circuit result %s %s", result, 409)
390
            raise HTTPException(409, detail=result)
391
392 1
        if evc.is_active():
393
            if enable is False:  # disable if active
394
                with evc.lock:
395
                    evc.remove()
396
            elif redeploy is not None:  # redeploy if active
397
                with evc.lock:
398
                    evc.remove()
399
                    evc.deploy()
400
        else:
401 1
            if enable is True:  # enable if inactive
402 1
                with evc.lock:
403 1
                    evc.deploy()
404 1
        result = {evc.id: evc.as_dict()}
405 1
        status = 200
406
407 1
        log.debug("update result %s %s", result, status)
408 1
        emit_event(self.controller, "updated",
409
                   content=map_evc_event_content(evc, **data))
410 1
        return JSONResponse(result, status_code=status)
411
412 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
413 1
    def delete_circuit(self, request: Request) -> JSONResponse:
414
        """Remove a circuit.
415
416
        First, the flows are removed from the switches, and then the EVC is
417
        disabled.
418
        """
419 1
        circuit_id = request.path_params["circuit_id"]
420 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
421 1
        try:
422 1
            evc = self.circuits[circuit_id]
423 1
        except KeyError:
424 1
            result = f"circuit_id {circuit_id} not found"
425 1
            log.debug("delete_circuit result %s %s", result, 404)
426 1
            raise HTTPException(404, detail=result) from KeyError
427
428 1
        if evc.archived:
429 1
            result = f"Circuit {circuit_id} already removed"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result)
432
433 1
        log.info("Removing %s", evc)
434 1
        with evc.lock:
435 1
            evc.remove_current_flows()
436 1
            evc.remove_failover_flows(sync=False)
437 1
            evc.deactivate()
438 1
            evc.disable()
439 1
            self.sched.remove(evc)
440 1
            evc.archive()
441 1
            evc.remove_uni_tags()
442 1
            evc.sync()
443 1
        log.info("EVC removed. %s", evc)
444 1
        result = {"response": f"Circuit {circuit_id} removed"}
445 1
        status = 200
446
447 1
        log.debug("delete_circuit result %s %s", result, status)
448 1
        emit_event(self.controller, "deleted",
449
                   content=map_evc_event_content(evc))
450 1
        return JSONResponse(result, status_code=status)
451
452 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
453 1
    def get_metadata(self, request: Request) -> JSONResponse:
454
        """Get metadata from an EVC."""
455 1
        circuit_id = request.path_params["circuit_id"]
456 1
        try:
457 1
            return (
458
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
459
            )
460
        except KeyError as error:
461
            raise HTTPException(
462
                404,
463
                detail=f"circuit_id {circuit_id} not found."
464
            ) from error
465
466 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
467 1
    @validate_openapi(spec)
468 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
469
        """Add metadata to a bulk of EVCs."""
470 1
        data = get_json_or_400(request, self.controller.loop)
471 1
        circuit_ids = data.pop("circuit_ids")
472
473 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
474
475 1
        fail_evcs = []
476 1
        for _id in circuit_ids:
477 1
            try:
478 1
                evc = self.circuits[_id]
479 1
                evc.extend_metadata(data)
480 1
            except KeyError:
481 1
                fail_evcs.append(_id)
482
483 1
        if fail_evcs:
484 1
            raise HTTPException(404, detail=fail_evcs)
485 1
        return JSONResponse("Operation successful", status_code=201)
486
487 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
488 1
    @validate_openapi(spec)
489 1
    def add_metadata(self, request: Request) -> JSONResponse:
490
        """Add metadata to an EVC."""
491 1
        circuit_id = request.path_params["circuit_id"]
492 1
        metadata = get_json_or_400(request, self.controller.loop)
493 1
        if not isinstance(metadata, dict):
494
            raise HTTPException(400, "Invalid metadata value: {metadata}")
495 1
        try:
496 1
            evc = self.circuits[circuit_id]
497 1
        except KeyError as error:
498 1
            raise HTTPException(
499
                404,
500
                detail=f"circuit_id {circuit_id} not found."
501
            ) from error
502
503 1
        evc.extend_metadata(metadata)
504 1
        evc.sync()
505 1
        return JSONResponse("Operation successful", status_code=201)
506
507 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508 1
    @validate_openapi(spec)
509 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
510
        """Delete metada from a bulk of EVCs"""
511 1
        data = get_json_or_400(request, self.controller.loop)
512 1
        key = request.path_params["key"]
513 1
        circuit_ids = data.pop("circuit_ids")
514 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
515
516 1
        fail_evcs = []
517 1
        for _id in circuit_ids:
518 1
            try:
519 1
                evc = self.circuits[_id]
520 1
                evc.remove_metadata(key)
521 1
            except KeyError:
522 1
                fail_evcs.append(_id)
523
524 1
        if fail_evcs:
525 1
            raise HTTPException(404, detail=fail_evcs)
526 1
        return JSONResponse("Operation successful")
527
528 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
529 1
    def delete_metadata(self, request: Request) -> JSONResponse:
530
        """Delete metadata from an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532 1
        key = request.path_params["key"]
533 1
        try:
534 1
            evc = self.circuits[circuit_id]
535 1
        except KeyError as error:
536 1
            raise HTTPException(
537
                404,
538
                detail=f"circuit_id {circuit_id} not found."
539
            ) from error
540
541 1
        evc.remove_metadata(key)
542 1
        evc.sync()
543 1
        return JSONResponse("Operation successful")
544
545 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
546 1
    def redeploy(self, request: Request) -> JSONResponse:
547
        """Endpoint to force the redeployment of an EVC."""
548 1
        circuit_id = request.path_params["circuit_id"]
549 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
550 1
        try:
551 1
            evc = self.circuits[circuit_id]
552 1
        except KeyError:
553 1
            raise HTTPException(
554
                404,
555
                detail=f"circuit_id {circuit_id} not found"
556
            ) from KeyError
557 1
        if evc.is_enabled():
558 1
            with evc.lock:
559 1
                evc.remove_current_flows()
560 1
                evc.deploy()
561 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
562 1
            status = 202
563
        else:
564 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
565 1
            status = 409
566
567 1
        return JSONResponse(result, status_code=status)
568
569 1
    @rest("/v2/evc/schedule/", methods=["POST"])
570 1
    @validate_openapi(spec)
571 1
    def create_schedule(self, request: Request) -> JSONResponse:
572
        """
573
        Create a new schedule for a given circuit.
574
575
        This service do no check if there are conflicts with another schedule.
576
        Payload example:
577
            {
578
              "circuit_id":"aa:bb:cc",
579
              "schedule": {
580
                "date": "2019-08-07T14:52:10.967Z",
581
                "interval": "string",
582
                "frequency": "1 * * * *",
583
                "action": "create"
584
              }
585
            }
586
        """
587 1
        log.debug("create_schedule /v2/evc/schedule/")
588 1
        data = get_json_or_400(request, self.controller.loop)
589 1
        circuit_id = data["circuit_id"]
590 1
        schedule_data = data["schedule"]
591
592
        # Get EVC from circuits buffer
593 1
        circuits = self._get_circuits_buffer()
594
595
        # get the circuit
596 1
        evc = circuits.get(circuit_id)
597
598
        # get the circuit
599 1
        if not evc:
600 1
            result = f"circuit_id {circuit_id} not found"
601 1
            log.debug("create_schedule result %s %s", result, 404)
602 1
            raise HTTPException(404, detail=result)
603
        # Can not modify circuits deleted and archived
604 1
        if evc.archived:
605 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
606 1
            log.debug("create_schedule result %s %s", result, 409)
607 1
            raise HTTPException(409, detail=result)
608
609
        # new schedule from dict
610 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
611
612
        # If there is no schedule, create the list
613 1
        if not evc.circuit_scheduler:
614 1
            evc.circuit_scheduler = []
615
616
        # Add the new schedule
617 1
        evc.circuit_scheduler.append(new_schedule)
618
619
        # Add schedule job
620 1
        self.sched.add_circuit_job(evc, new_schedule)
621
622
        # save circuit to mongodb
623 1
        evc.sync()
624
625 1
        result = new_schedule.as_dict()
626 1
        status = 201
627
628 1
        log.debug("create_schedule result %s %s", result, status)
629 1
        return JSONResponse(result, status_code=status)
630
631 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
632 1
    @validate_openapi(spec)
633 1
    def update_schedule(self, request: Request) -> JSONResponse:
634
        """Update a schedule.
635
636
        Change all attributes from the given schedule from a EVC circuit.
637
        The schedule ID is preserved as default.
638
        Payload example:
639
            {
640
              "date": "2019-08-07T14:52:10.967Z",
641
              "interval": "string",
642
              "frequency": "1 * * *",
643
              "action": "create"
644
            }
645
        """
646 1
        data = get_json_or_400(request, self.controller.loop)
647 1
        schedule_id = request.path_params["schedule_id"]
648 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
649
650
        # Try to find a circuit schedule
651 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
652
653
        # Can not modify circuits deleted and archived
654 1
        if not found_schedule:
655 1
            result = f"schedule_id {schedule_id} not found"
656 1
            log.debug("update_schedule result %s %s", result, 404)
657 1
            raise HTTPException(404, detail=result)
658 1
        if evc.archived:
659 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
660 1
            log.debug("update_schedule result %s %s", result, 409)
661 1
            raise HTTPException(409, detail=result)
662
663 1
        new_schedule = CircuitSchedule.from_dict(data)
664 1
        new_schedule.id = found_schedule.id
665
        # Remove the old schedule
666 1
        evc.circuit_scheduler.remove(found_schedule)
667
        # Append the modified schedule
668 1
        evc.circuit_scheduler.append(new_schedule)
669
670
        # Cancel all schedule jobs
671 1
        self.sched.cancel_job(found_schedule.id)
672
        # Add the new circuit schedule
673 1
        self.sched.add_circuit_job(evc, new_schedule)
674
        # Save EVC to mongodb
675 1
        evc.sync()
676
677 1
        result = new_schedule.as_dict()
678 1
        status = 200
679
680 1
        log.debug("update_schedule result %s %s", result, status)
681 1
        return JSONResponse(result, status_code=status)
682
683 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
684 1
    def delete_schedule(self, request: Request) -> JSONResponse:
685
        """Remove a circuit schedule.
686
687
        Remove the Schedule from EVC.
688
        Remove the Schedule from cron job.
689
        Save the EVC to the Storehouse.
690
        """
691 1
        schedule_id = request.path_params["schedule_id"]
692 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
693 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
694
695
        # Can not modify circuits deleted and archived
696 1
        if not found_schedule:
697 1
            result = f"schedule_id {schedule_id} not found"
698 1
            log.debug("delete_schedule result %s %s", result, 404)
699 1
            raise HTTPException(404, detail=result)
700
701 1
        if evc.archived:
702 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
703 1
            log.debug("delete_schedule result %s %s", result, 409)
704 1
            raise HTTPException(409, detail=result)
705
706
        # Remove the old schedule
707 1
        evc.circuit_scheduler.remove(found_schedule)
708
709
        # Cancel all schedule jobs
710 1
        self.sched.cancel_job(found_schedule.id)
711
        # Save EVC to mongodb
712 1
        evc.sync()
713
714 1
        result = "Schedule removed"
715 1
        status = 200
716
717 1
        log.debug("delete_schedule result %s %s", result, status)
718 1
        return JSONResponse(result, status_code=status)
719
720 1
    def _is_duplicated_evc(self, evc):
721
        """Verify if the circuit given is duplicated with the stored evcs.
722
723
        Args:
724
            evc (EVC): circuit to be analysed.
725
726
        Returns:
727
            boolean: True if the circuit is duplicated, otherwise False.
728
729
        """
730 1
        for circuit in tuple(self.circuits.values()):
731 1
            if (not circuit.archived and circuit._id != evc._id
732
                    and circuit.shares_uni(evc)):
733 1
                return True
734 1
        return False
735
736 1
    @listen_to("kytos/topology.link_up")
737 1
    def on_link_up(self, event):
738
        """Change circuit when link is up or end_maintenance."""
739
        self.handle_link_up(event)
740
741 1
    def handle_link_up(self, event):
742
        """Change circuit when link is up or end_maintenance."""
743 1
        log.info("Event handle_link_up %s", event.content["link"])
744 1
        for evc in self.get_evcs_by_svc_level():
745 1
            if evc.is_enabled() and not evc.archived:
746 1
                with evc.lock:
747 1
                    evc.handle_link_up(event.content["link"])
748
749
    # Possibly replace this with interruptions?
750 1
    @listen_to(
751
        '.*.switch.interface.(link_up|link_down|created|deleted)',
752
        pool='dynamic_single'
753
    )
754 1
    def on_interface_link_change(self, event: KytosEvent):
755
        """
756
        Handler for interface link_up and link_down events
757
        """
758
        with self._lock:
759
            _, _, event_type = event.name.rpartition('.')
760
            iface = event.content.get("interface")
761
            if event_type in ('link_up', 'created'):
762
                self.handle_interface_link_up(iface)
763
            elif event_type in ('link_down', 'deleted'):
764
                self.handle_interface_link_down(iface)
765
766 1
    def handle_interface_link_up(self, interface):
767
        """
768
        Handler for interface link_up events
769
        """
770
        for evc in self.get_evcs_by_svc_level():
771
            log.info("Event handle_interface_link_up %s", interface)
772
            evc.handle_interface_link_up(
773
                interface
774
            )
775
776 1
    def handle_interface_link_down(self, interface):
777
        """
778
        Handler for interface link_down events
779
        """
780
        for evc in self.get_evcs_by_svc_level():
781
            log.info("Event handle_interface_link_down %s", interface)
782
            evc.handle_interface_link_down(
783
                interface
784
            )
785
786 1
    @listen_to("kytos/topology.link_down")
787 1
    def on_link_down(self, event):
788
        """Change circuit when link is down or under_mantenance."""
789
        self.handle_link_down(event)
790
791 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
792
        """Change circuit when link is down or under_mantenance."""
793 1
        link = event.content["link"]
794 1
        log.info("Event handle_link_down %s", link)
795 1
        switch_flows = {}
796 1
        evcs_with_failover = []
797 1
        evcs_normal = []
798 1
        check_failover = []
799 1
        for evc in self.get_evcs_by_svc_level():
800 1
            if evc.is_affected_by_link(link):
801
                # if there is no failover path, handles link down the
802
                # tradditional way
803 1
                if (
804
                    not getattr(evc, 'failover_path', None) or
805
                    evc.is_failover_path_affected_by_link(link)
806
                ):
807 1
                    evcs_normal.append(evc)
808 1
                    continue
809 1
                try:
810 1
                    dpid_flows = evc.get_failover_flows()
811
                # pylint: disable=broad-except
812 1
                except Exception:
813 1
                    err = traceback.format_exc().replace("\n", ", ")
814 1
                    log.error(
815
                        f"Ignore Failover path for {evc} due to error: {err}"
816
                    )
817 1
                    evcs_normal.append(evc)
818 1
                    continue
819 1
                for dpid, flows in dpid_flows.items():
820 1
                    switch_flows.setdefault(dpid, [])
821 1
                    switch_flows[dpid].extend(flows)
822 1
                evcs_with_failover.append(evc)
823
            else:
824 1
                check_failover.append(evc)
825
826 1
        while switch_flows:
827 1
            offset = settings.BATCH_SIZE or None
828 1
            switches = list(switch_flows.keys())
829 1
            for dpid in switches:
830 1
                emit_event(
831
                    self.controller,
832
                    context="kytos.flow_manager",
833
                    name="flows.install",
834
                    content={
835
                        "dpid": dpid,
836
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
837
                    }
838
                )
839 1
                if offset is None or offset >= len(switch_flows[dpid]):
840 1
                    del switch_flows[dpid]
841 1
                    continue
842 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
843 1
            time.sleep(settings.BATCH_INTERVAL)
844
845 1
        for evc in evcs_with_failover:
846 1
            with evc.lock:
847 1
                old_path = evc.current_path
848 1
                evc.current_path = evc.failover_path
849 1
                evc.failover_path = old_path
850 1
                evc.sync()
851 1
            emit_event(self.controller, "redeployed_link_down",
852
                       content=map_evc_event_content(evc))
853 1
            log.info(
854
                f"{evc} redeployed with failover due to link down {link.id}"
855
            )
856
857 1
        for evc in evcs_normal:
858 1
            emit_event(
859
                self.controller,
860
                "evc_affected_by_link_down",
861
                content={"link_id": link.id} | map_evc_event_content(evc)
862
            )
863
864
        # After handling the hot path, check if new failover paths are needed.
865
        # Note that EVCs affected by link down will generate a KytosEvent for
866
        # deployed|redeployed, which will trigger the failover path setup.
867
        # Thus, we just need to further check the check_failover list
868 1
        for evc in check_failover:
869 1
            if evc.is_failover_path_affected_by_link(link):
870 1
                with evc.lock:
871 1
                    evc.setup_failover_path()
872
873 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
874 1
    def on_evc_affected_by_link_down(self, event):
875
        """Change circuit when link is down or under_mantenance."""
876
        self.handle_evc_affected_by_link_down(event)
877
878 1
    def handle_evc_affected_by_link_down(self, event):
879
        """Change circuit when link is down or under_mantenance."""
880 1
        evc = self.circuits.get(event.content["evc_id"])
881 1
        link_id = event.content['link_id']
882 1
        if not evc:
883 1
            return
884 1
        with evc.lock:
885 1
            result = evc.handle_link_down()
886 1
        event_name = "error_redeploy_link_down"
887 1
        if result:
888 1
            log.info(f"{evc} redeployed due to link down {link_id}")
889 1
            event_name = "redeployed_link_down"
890 1
        emit_event(self.controller, event_name,
891
                   content=map_evc_event_content(evc))
892
893 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
894 1
    def on_evc_deployed(self, event):
895
        """Handle EVC deployed|redeployed_link_down."""
896
        self.handle_evc_deployed(event)
897
898 1
    def handle_evc_deployed(self, event):
899
        """Setup failover path on evc deployed."""
900 1
        evc = self.circuits.get(event.content["evc_id"])
901 1
        if not evc:
902 1
            return
903 1
        with evc.lock:
904 1
            evc.setup_failover_path()
905
906 1
    @listen_to("kytos/topology.topology_loaded")
907 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
908
        """Load EVCs once the topology is available."""
909
        self.load_all_evcs()
910
911 1
    def load_all_evcs(self):
912
        """Try to load all EVCs on startup."""
913 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
914 1
        for circuit_id, circuit in circuits:
915 1
            if circuit_id not in self.circuits:
916 1
                self._load_evc(circuit)
917
918 1
    def _load_evc(self, circuit_dict):
919
        """Load one EVC from mongodb to memory."""
920 1
        try:
921 1
            evc = self._evc_from_dict(circuit_dict)
922 1
        except ValueError as exception:
923 1
            log.error(
924
                f"Could not load EVC: dict={circuit_dict} error={exception}"
925
            )
926 1
            return None
927 1
        except KytosTagError as exception:
928 1
            log.error(
929
                f"Could not load EVC: dict={circuit_dict} error={exception}"
930
            )
931 1
            return None
932 1
        if evc.archived:
933 1
            return None
934
935 1
        self.circuits.setdefault(evc.id, evc)
936 1
        self.sched.add(evc)
937 1
        return evc
938
939 1
    @listen_to("kytos/flow_manager.flow.error")
940 1
    def on_flow_mod_error(self, event):
941
        """Handle flow mod errors related to an EVC."""
942
        self.handle_flow_mod_error(event)
943
944 1
    def handle_flow_mod_error(self, event):
945
        """Handle flow mod errors related to an EVC."""
946 1
        flow = event.content["flow"]
947 1
        command = event.content.get("error_command")
948 1
        if command != "add":
949
            return
950 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
951 1
        if evc:
952 1
            with evc.lock:
953 1
                evc.remove_current_flows()
954
955 1
    def _evc_dict_with_instances(self, evc_dict):
956
        """Convert some dict values to instance of EVC classes.
957
958
        This method will convert: [UNI, Link]
959
        """
960 1
        data = evc_dict.copy()  # Do not modify the original dict
961 1
        for attribute, value in data.items():
962
            # Get multiple attributes.
963
            # Ex: uni_a, uni_z
964 1
            if "uni" in attribute:
965 1
                try:
966 1
                    data[attribute] = self._uni_from_dict(value)
967 1
                except ValueError as exception:
968 1
                    result = "Error creating UNI: Invalid value"
969 1
                    raise ValueError(result) from exception
970
971 1
            if attribute == "circuit_scheduler":
972 1
                data[attribute] = []
973 1
                for schedule in value:
974 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
975
976
            # Get multiple attributes.
977
            # Ex: primary_links,
978
            #     backup_links,
979
            #     current_links_cache,
980
            #     primary_links_cache,
981
            #     backup_links_cache
982 1
            if "links" in attribute:
983 1
                data[attribute] = [
984
                    self._link_from_dict(link) for link in value
985
                ]
986
987
            # Ex: current_path,
988
            #     primary_path,
989
            #     backup_path
990 1
            if "path" in attribute and attribute != "dynamic_backup_path":
991 1
                data[attribute] = Path(
992
                    [self._link_from_dict(link) for link in value]
993
                )
994
995 1
        return data
996
997 1
    def _evc_from_dict(self, evc_dict):
998 1
        data = self._evc_dict_with_instances(evc_dict)
999 1
        data["table_group"] = self.table_group
1000 1
        return EVC(self.controller, **data)
1001
1002 1
    def _uni_from_dict(self, uni_dict):
1003
        """Return a UNI object from python dict."""
1004 1
        if uni_dict is None:
1005 1
            return False
1006
1007 1
        interface_id = uni_dict.get("interface_id")
1008 1
        interface = self.controller.get_interface_by_id(interface_id)
1009 1
        if interface is None:
1010 1
            result = (
1011
                "Error creating UNI:"
1012
                + f"Could not instantiate interface {interface_id}"
1013
            )
1014 1
            raise ValueError(result) from ValueError
1015 1
        tag_convert = {1: "vlan"}
1016 1
        tag_dict = uni_dict.get("tag", None)
1017 1
        if tag_dict:
1018 1
            tag_type = tag_dict.get("tag_type")
1019 1
            tag_type = tag_convert.get(tag_type, tag_type)
1020 1
            tag_value = tag_dict.get("value")
1021 1
            if isinstance(tag_value, list):
1022 1
                tag_value = get_tag_ranges(tag_value)
1023 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1024 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1025
            else:
1026 1
                tag = TAG(tag_type, tag_value)
1027
        else:
1028 1
            tag = None
1029 1
        uni = UNI(interface, tag)
1030 1
        return uni
1031
1032 1
    def _link_from_dict(self, link_dict):
1033
        """Return a Link object from python dict."""
1034 1
        id_a = link_dict.get("endpoint_a").get("id")
1035 1
        id_b = link_dict.get("endpoint_b").get("id")
1036
1037 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1038 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1039 1
        if not endpoint_a:
1040 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1041 1
            raise ValueError(error_msg)
1042 1
        if not endpoint_b:
1043
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1044
            raise ValueError(error_msg)
1045
1046 1
        link = Link(endpoint_a, endpoint_b)
1047 1
        if "metadata" in link_dict:
1048 1
            link.extend_metadata(link_dict.get("metadata"))
1049
1050 1
        s_vlan = link.get_metadata("s_vlan")
1051 1
        if s_vlan:
1052 1
            tag = TAG.from_dict(s_vlan)
1053 1
            if tag is False:
1054
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1055
                raise ValueError(error_msg)
1056 1
            link.update_metadata("s_vlan", tag)
1057 1
        return link
1058
1059 1
    def _find_evc_by_schedule_id(self, schedule_id):
1060
        """
1061
        Find an EVC and CircuitSchedule based on schedule_id.
1062
1063
        :param schedule_id: Schedule ID
1064
        :return: EVC and Schedule
1065
        """
1066 1
        circuits = self._get_circuits_buffer()
1067 1
        found_schedule = None
1068 1
        evc = None
1069
1070
        # pylint: disable=unused-variable
1071 1
        for c_id, circuit in circuits.items():
1072 1
            for schedule in circuit.circuit_scheduler:
1073 1
                if schedule.id == schedule_id:
1074 1
                    found_schedule = schedule
1075 1
                    evc = circuit
1076 1
                    break
1077 1
            if found_schedule:
1078 1
                break
1079 1
        return evc, found_schedule
1080
1081 1
    def _get_circuits_buffer(self):
1082
        """
1083
        Return the circuit buffer.
1084
1085
        If the buffer is empty, try to load data from mongodb.
1086
        """
1087 1
        if not self.circuits:
1088
            # Load circuits from mongodb to buffer
1089 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1090 1
            for c_id, circuit in circuits.items():
1091 1
                evc = self._evc_from_dict(circuit)
1092 1
                self.circuits[c_id] = evc
1093 1
        return self.circuits
1094
1095
    # pylint: disable=attribute-defined-outside-init
1096 1
    @alisten_to("kytos/of_multi_table.enable_table")
1097 1
    async def on_table_enabled(self, event):
1098
        """Handle a recently table enabled."""
1099 1
        table_group = event.content.get("mef_eline", None)
1100 1
        if not table_group:
1101 1
            return
1102 1
        for group in table_group:
1103 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1104 1
                log.error(f'The table group "{group}" is not allowed for '
1105
                          f'mef_eline. Allowed table groups are '
1106
                          f'{settings.TABLE_GROUP_ALLOWED}')
1107 1
                return
1108 1
        self.table_group.update(table_group)
1109 1
        content = {"group_table": self.table_group}
1110 1
        name = "kytos/mef_eline.enable_table"
1111
        await aemit_event(self.controller, name, content)
1112