Passed
Pull Request — master (#438)
by Italo Valcy
08:11
created

build.main.Main.handle_cleanup_evcs_old_path()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 8
ccs 7
cts 7
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        if enable_filter:
82 1
            return sorted(
83
                          [circuit for circuit in self.circuits.values()
84
                           if circuit.is_enabled()],
85
                          key=lambda x: (-x.service_level, x.creation_time),
86
            )
87 1
        return sorted(self.circuits.values(),
88
                      key=lambda x: (-x.service_level, x.creation_time))
89
90 1
    @staticmethod
91 1
    def get_eline_controller():
92
        """Return the ELineController instance."""
93
        return controllers.ELineController()
94
95 1
    def execute(self):
96
        """Execute once when the napp is running."""
97 1
        if self._lock.locked():
98 1
            return
99 1
        log.debug("Starting consistency routine")
100 1
        with self._lock:
101 1
            self.execute_consistency()
102 1
        log.debug("Finished consistency routine")
103
104 1
    def should_be_checked(self, circuit):
105
        "Verify if the circuit meets the necessary conditions to be checked"
106
        # pylint: disable=too-many-boolean-expressions
107 1
        if (
108
                circuit.is_enabled()
109
                and not circuit.is_active()
110
                and not circuit.lock.locked()
111
                and not circuit.has_recent_removed_flow()
112
                and not circuit.is_recent_updated()
113
                and circuit.are_unis_active(self.controller.switches)
114
                # if a inter-switch EVC does not have current_path, it does not
115
                # make sense to run sdntrace on it
116
                and (circuit.is_intra_switch() or circuit.current_path)
117
                ):
118 1
            return True
119
        return False
120
121 1
    def execute_consistency(self):
122
        """Execute consistency routine."""
123 1
        circuits_to_check = []
124 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
125 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
126 1
            stored_circuits.pop(circuit.id, None)
127 1
            if self.should_be_checked(circuit):
128 1
                circuits_to_check.append(circuit)
129
            # setup failover_path whenever possible
130 1
            if getattr(circuit, "failover_path", None):
131 1
                continue
132
            affected_at = getattr(circuit, "affected_by_link_at", None)
133
            if (
134
                not affected_at or
135
                (now() - affected_at).seconds >= settings.DEPLOY_EVCS_INTERVAL
136
            ):
137
                with circuit.lock:
138
                    circuit.setup_failover_path()
139 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
140 1
        for circuit in circuits_to_check:
141 1
            is_checked = circuits_checked.get(circuit.id)
142 1
            if is_checked:
143 1
                circuit.execution_rounds = 0
144 1
                log.info(f"{circuit} enabled but inactive - activating")
145 1
                with circuit.lock:
146 1
                    circuit.activate()
147 1
                    circuit.sync()
148
            else:
149 1
                circuit.execution_rounds += 1
150 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
151 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
152 1
                    with circuit.lock:
153 1
                        circuit.deploy()
154 1
        for circuit_id in stored_circuits:
155 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
156 1
            self._load_evc(stored_circuits[circuit_id])
157
158 1
    def shutdown(self):
159
        """Execute when your napp is unloaded.
160
161
        If you have some cleanup procedure, insert it here.
162
        """
163
164 1
    @rest("/v2/evc/", methods=["GET"])
165 1
    def list_circuits(self, request: Request) -> JSONResponse:
166
        """Endpoint to return circuits stored.
167
168
        archive query arg if defined (not null) will be filtered
169
        accordingly, by default only non archived evcs will be listed
170
        """
171 1
        log.debug("list_circuits /v2/evc")
172 1
        args = request.query_params
173 1
        archived = args.get("archived", "false").lower()
174 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
175 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
176
                                                      metadata=args)
177 1
        circuits = circuits['circuits']
178 1
        return JSONResponse(circuits)
179
180 1
    @rest("/v2/evc/schedule", methods=["GET"])
181 1
    def list_schedules(self, _request: Request) -> JSONResponse:
182
        """Endpoint to return all schedules stored for all circuits.
183
184
        Return a JSON with the following template:
185
        [{"schedule_id": <schedule_id>,
186
         "circuit_id": <circuit_id>,
187
         "schedule": <schedule object>}]
188
        """
189 1
        log.debug("list_schedules /v2/evc/schedule")
190 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
191 1
        if not circuits:
192 1
            result = {}
193 1
            status = 200
194 1
            return JSONResponse(result, status_code=status)
195
196 1
        result = []
197 1
        status = 200
198 1
        for circuit in circuits:
199 1
            circuit_scheduler = circuit.get("circuit_scheduler")
200 1
            if circuit_scheduler:
201 1
                for scheduler in circuit_scheduler:
202 1
                    value = {
203
                        "schedule_id": scheduler.get("id"),
204
                        "circuit_id": circuit.get("id"),
205
                        "schedule": scheduler,
206
                    }
207 1
                    result.append(value)
208
209 1
        log.debug("list_schedules result %s %s", result, status)
210 1
        return JSONResponse(result, status_code=status)
211
212 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
213 1
    def get_circuit(self, request: Request) -> JSONResponse:
214
        """Endpoint to return a circuit based on id."""
215 1
        circuit_id = request.path_params["circuit_id"]
216 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
217 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
218 1
        if not circuit:
219 1
            result = f"circuit_id {circuit_id} not found"
220 1
            log.debug("get_circuit result %s %s", result, 404)
221 1
            raise HTTPException(404, detail=result)
222 1
        status = 200
223 1
        log.debug("get_circuit result %s %s", circuit, status)
224 1
        return JSONResponse(circuit, status_code=status)
225
226
    # pylint: disable=too-many-branches, too-many-statements
227 1
    @rest("/v2/evc/", methods=["POST"])
228 1
    @validate_openapi(spec)
229 1
    def create_circuit(self, request: Request) -> JSONResponse:
230
        """Try to create a new circuit.
231
232
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
233
        UNI_Z's requested C-VID are available from the interfaces' pools. This
234
        is checked when creating the UNI object.
235
236
        Then, E-Line NApp requests a primary and a backup path to the
237
        Pathfinder NApp using the attributes primary_links and backup_links
238
        submitted via REST
239
240
        # For each link composing paths in #3:
241
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
242
        #  - Using the S-VID obtained, generate abstract flow entries to be
243
        #    sent to FlowManager
244
245
        Push abstract flow entries to FlowManager and FlowManager pushes
246
        OpenFlow entries to datapaths
247
248
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
249
        creation
250
251
        Finnaly, notify user of the status of its request.
252
        """
253
        # Try to create the circuit object
254 1
        log.debug("create_circuit /v2/evc/")
255 1
        data = get_json_or_400(request, self.controller.loop)
256
257 1
        try:
258 1
            evc = self._evc_from_dict(data)
259 1
        except (ValueError, KytosTagError) as exception:
260 1
            log.debug("create_circuit result %s %s", exception, 400)
261 1
            raise HTTPException(400, detail=str(exception)) from exception
262 1
        try:
263 1
            check_disabled_component(evc.uni_a, evc.uni_z)
264 1
        except DisabledSwitch as exception:
265 1
            log.debug("create_circuit result %s %s", exception, 409)
266 1
            raise HTTPException(
267
                    409,
268
                    detail=f"Path is not valid: {exception}"
269
                ) from exception
270
271 1
        if evc.primary_path:
272 1
            try:
273 1
                evc.primary_path.is_valid(
274
                    evc.uni_a.interface.switch,
275
                    evc.uni_z.interface.switch,
276
                    bool(evc.circuit_scheduler),
277
                )
278 1
            except InvalidPath as exception:
279 1
                raise HTTPException(
280
                    400,
281
                    detail=f"primary_path is not valid: {exception}"
282
                ) from exception
283 1
        if evc.backup_path:
284 1
            try:
285 1
                evc.backup_path.is_valid(
286
                    evc.uni_a.interface.switch,
287
                    evc.uni_z.interface.switch,
288
                    bool(evc.circuit_scheduler),
289
                )
290 1
            except InvalidPath as exception:
291 1
                raise HTTPException(
292
                    400,
293
                    detail=f"backup_path is not valid: {exception}"
294
                ) from exception
295
296 1
        if not evc._tag_lists_equal():
297 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
298 1
            raise HTTPException(400, detail=detail)
299
300 1
        try:
301 1
            evc._validate_has_primary_or_dynamic()
302 1
        except ValueError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305 1
        try:
306 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
307
        except DuplicatedNoTagUNI as exception:
308
            log.debug("create_circuit result %s %s", exception, 409)
309
            raise HTTPException(409, detail=str(exception)) from exception
310
311 1
        try:
312 1
            self._use_uni_tags(evc)
313 1
        except KytosTagError as exception:
314 1
            raise HTTPException(400, detail=str(exception)) from exception
315
316
        # save circuit
317 1
        try:
318 1
            evc.sync()
319
        except ValidationError as exception:
320
            raise HTTPException(400, detail=str(exception)) from exception
321
322
        # store circuit in dictionary
323 1
        self.circuits[evc.id] = evc
324
325
        # Schedule the circuit deploy
326 1
        self.sched.add(evc)
327
328
        # Circuit has no schedule, deploy now
329 1
        if not evc.circuit_scheduler:
330 1
            with evc.lock:
331 1
                evc.deploy()
332
333
        # Notify users
334 1
        result = {"circuit_id": evc.id}
335 1
        status = 201
336 1
        log.debug("create_circuit result %s %s", result, status)
337 1
        emit_event(self.controller, name="created",
338
                   content=map_evc_event_content(evc))
339 1
        return JSONResponse(result, status_code=status)
340
341 1
    @staticmethod
342 1
    def _use_uni_tags(evc):
343 1
        uni_a = evc.uni_a
344 1
        evc._use_uni_vlan(uni_a)
345 1
        try:
346 1
            uni_z = evc.uni_z
347 1
            evc._use_uni_vlan(uni_z)
348 1
        except KytosTagError as err:
349 1
            evc.make_uni_vlan_available(uni_a)
350 1
            raise err
351
352 1
    @listen_to('kytos/flow_manager.flow.removed')
353 1
    def on_flow_delete(self, event):
354
        """Capture delete messages to keep track when flows got removed."""
355
        self.handle_flow_delete(event)
356
357 1
    def handle_flow_delete(self, event):
358
        """Keep track when the EVC got flows removed by deriving its cookie."""
359 1
        flow = event.content["flow"]
360 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
361 1
        if evc:
362 1
            log.debug("Flow removed in EVC %s", evc.id)
363 1
            evc.set_flow_removed_at()
364
365 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
366 1
    @validate_openapi(spec)
367 1
    def update(self, request: Request) -> JSONResponse:
368
        """Update a circuit based on payload.
369
370
        The EVC attributes (creation_time, active, current_path,
371
        failover_path, _id, archived) can't be updated.
372
        """
373 1
        data = get_json_or_400(request, self.controller.loop)
374 1
        circuit_id = request.path_params["circuit_id"]
375 1
        log.debug("update /v2/evc/%s", circuit_id)
376 1
        try:
377 1
            evc = self.circuits[circuit_id]
378 1
        except KeyError:
379 1
            result = f"circuit_id {circuit_id} not found"
380 1
            log.debug("update result %s %s", result, 404)
381 1
            raise HTTPException(404, detail=result) from KeyError
382
383 1
        try:
384 1
            updated_data = self._evc_dict_with_instances(data)
385 1
            self._check_no_tag_duplication(
386
                circuit_id, updated_data.get("uni_a"),
387
                updated_data.get("uni_z")
388
            )
389 1
            enable, redeploy = evc.update(**updated_data)
390 1
        except (ValueError, KytosTagError, ValidationError) as exception:
391 1
            log.debug("update result %s %s", exception, 400)
392 1
            raise HTTPException(400, detail=str(exception)) from exception
393 1
        except DuplicatedNoTagUNI as exception:
394
            log.debug("update result %s %s", exception, 409)
395
            raise HTTPException(409, detail=str(exception)) from exception
396 1
        except DisabledSwitch as exception:
397 1
            log.debug("update result %s %s", exception, 409)
398 1
            raise HTTPException(
399
                    409,
400
                    detail=f"Path is not valid: {exception}"
401
                ) from exception
402
403 1
        if evc.is_active():
404
            if enable is False:  # disable if active
405
                with evc.lock:
406
                    evc.remove()
407
            elif redeploy is not None:  # redeploy if active
408
                with evc.lock:
409
                    evc.remove()
410
                    evc.deploy()
411
        else:
412 1
            if enable is True:  # enable if inactive
413 1
                with evc.lock:
414 1
                    evc.deploy()
415 1
            elif evc.is_enabled() and redeploy:
416 1
                with evc.lock:
417 1
                    evc.remove()
418 1
                    evc.deploy()
419 1
        result = {evc.id: evc.as_dict()}
420 1
        status = 200
421
422 1
        log.debug("update result %s %s", result, status)
423 1
        emit_event(self.controller, "updated",
424
                   content=map_evc_event_content(evc, **data))
425 1
        return JSONResponse(result, status_code=status)
426
427 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
428 1
    def delete_circuit(self, request: Request) -> JSONResponse:
429
        """Remove a circuit.
430
431
        First, the flows are removed from the switches, and then the EVC is
432
        disabled.
433
        """
434 1
        circuit_id = request.path_params["circuit_id"]
435 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
436 1
        try:
437 1
            evc = self.circuits.pop(circuit_id)
438 1
        except KeyError:
439 1
            result = f"circuit_id {circuit_id} not found"
440 1
            log.debug("delete_circuit result %s %s", result, 404)
441 1
            raise HTTPException(404, detail=result) from KeyError
442
443 1
        log.info("Removing %s", evc)
444 1
        with evc.lock:
445 1
            evc.remove_current_flows()
446 1
            evc.remove_failover_flows(sync=False)
447 1
            evc.deactivate()
448 1
            evc.disable()
449 1
            self.sched.remove(evc)
450 1
            evc.archive()
451 1
            evc.remove_uni_tags()
452 1
            evc.sync()
453 1
        log.info("EVC removed. %s", evc)
454 1
        result = {"response": f"Circuit {circuit_id} removed"}
455 1
        status = 200
456
457 1
        log.debug("delete_circuit result %s %s", result, status)
458 1
        emit_event(self.controller, "deleted",
459
                   content=map_evc_event_content(evc))
460 1
        return JSONResponse(result, status_code=status)
461
462 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
463 1
    def get_metadata(self, request: Request) -> JSONResponse:
464
        """Get metadata from an EVC."""
465 1
        circuit_id = request.path_params["circuit_id"]
466 1
        try:
467 1
            return (
468
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
469
            )
470
        except KeyError as error:
471
            raise HTTPException(
472
                404,
473
                detail=f"circuit_id {circuit_id} not found."
474
            ) from error
475
476 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
477 1
    @validate_openapi(spec)
478 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
479
        """Add metadata to a bulk of EVCs."""
480 1
        data = get_json_or_400(request, self.controller.loop)
481 1
        circuit_ids = data.pop("circuit_ids")
482
483 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
484
485 1
        fail_evcs = []
486 1
        for _id in circuit_ids:
487 1
            try:
488 1
                evc = self.circuits[_id]
489 1
                evc.extend_metadata(data)
490 1
            except KeyError:
491 1
                fail_evcs.append(_id)
492
493 1
        if fail_evcs:
494 1
            raise HTTPException(404, detail=fail_evcs)
495 1
        return JSONResponse("Operation successful", status_code=201)
496
497 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
498 1
    @validate_openapi(spec)
499 1
    def add_metadata(self, request: Request) -> JSONResponse:
500
        """Add metadata to an EVC."""
501 1
        circuit_id = request.path_params["circuit_id"]
502 1
        metadata = get_json_or_400(request, self.controller.loop)
503 1
        if not isinstance(metadata, dict):
504
            raise HTTPException(400, "Invalid metadata value: {metadata}")
505 1
        try:
506 1
            evc = self.circuits[circuit_id]
507 1
        except KeyError as error:
508 1
            raise HTTPException(
509
                404,
510
                detail=f"circuit_id {circuit_id} not found."
511
            ) from error
512
513 1
        evc.extend_metadata(metadata)
514 1
        evc.sync()
515 1
        return JSONResponse("Operation successful", status_code=201)
516
517 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
518 1
    @validate_openapi(spec)
519 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
520
        """Delete metada from a bulk of EVCs"""
521 1
        data = get_json_or_400(request, self.controller.loop)
522 1
        key = request.path_params["key"]
523 1
        circuit_ids = data.pop("circuit_ids")
524 1
        self.mongo_controller.update_evcs_metadata(
525
            circuit_ids, {key: ""}, "del"
526
        )
527
528 1
        fail_evcs = []
529 1
        for _id in circuit_ids:
530 1
            try:
531 1
                evc = self.circuits[_id]
532 1
                evc.remove_metadata(key)
533 1
            except KeyError:
534 1
                fail_evcs.append(_id)
535
536 1
        if fail_evcs:
537 1
            raise HTTPException(404, detail=fail_evcs)
538 1
        return JSONResponse("Operation successful")
539
540 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
541 1
    def delete_metadata(self, request: Request) -> JSONResponse:
542
        """Delete metadata from an EVC."""
543 1
        circuit_id = request.path_params["circuit_id"]
544 1
        key = request.path_params["key"]
545 1
        try:
546 1
            evc = self.circuits[circuit_id]
547 1
        except KeyError as error:
548 1
            raise HTTPException(
549
                404,
550
                detail=f"circuit_id {circuit_id} not found."
551
            ) from error
552
553 1
        evc.remove_metadata(key)
554 1
        evc.sync()
555 1
        return JSONResponse("Operation successful")
556
557 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
558 1
    def redeploy(self, request: Request) -> JSONResponse:
559
        """Endpoint to force the redeployment of an EVC."""
560 1
        circuit_id = request.path_params["circuit_id"]
561 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
562 1
        try:
563 1
            evc = self.circuits[circuit_id]
564 1
        except KeyError:
565 1
            raise HTTPException(
566
                404,
567
                detail=f"circuit_id {circuit_id} not found"
568
            ) from KeyError
569 1
        if evc.is_enabled():
570 1
            with evc.lock:
571 1
                evc.remove_current_flows()
572 1
                evc.deploy()
573 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
574 1
            status = 202
575
        else:
576 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
577 1
            status = 409
578
579 1
        return JSONResponse(result, status_code=status)
580
581 1
    @rest("/v2/evc/schedule/", methods=["POST"])
582 1
    @validate_openapi(spec)
583 1
    def create_schedule(self, request: Request) -> JSONResponse:
584
        """
585
        Create a new schedule for a given circuit.
586
587
        This service do no check if there are conflicts with another schedule.
588
        Payload example:
589
            {
590
              "circuit_id":"aa:bb:cc",
591
              "schedule": {
592
                "date": "2019-08-07T14:52:10.967Z",
593
                "interval": "string",
594
                "frequency": "1 * * * *",
595
                "action": "create"
596
              }
597
            }
598
        """
599 1
        log.debug("create_schedule /v2/evc/schedule/")
600 1
        data = get_json_or_400(request, self.controller.loop)
601 1
        circuit_id = data["circuit_id"]
602 1
        schedule_data = data["schedule"]
603
604
        # Get EVC from circuits buffer
605 1
        circuits = self._get_circuits_buffer()
606
607
        # get the circuit
608 1
        evc = circuits.get(circuit_id)
609
610
        # get the circuit
611 1
        if not evc:
612 1
            result = f"circuit_id {circuit_id} not found"
613 1
            log.debug("create_schedule result %s %s", result, 404)
614 1
            raise HTTPException(404, detail=result)
615
616
        # new schedule from dict
617 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
618
619
        # If there is no schedule, create the list
620 1
        if not evc.circuit_scheduler:
621 1
            evc.circuit_scheduler = []
622
623
        # Add the new schedule
624 1
        evc.circuit_scheduler.append(new_schedule)
625
626
        # Add schedule job
627 1
        self.sched.add_circuit_job(evc, new_schedule)
628
629
        # save circuit to mongodb
630 1
        evc.sync()
631
632 1
        result = new_schedule.as_dict()
633 1
        status = 201
634
635 1
        log.debug("create_schedule result %s %s", result, status)
636 1
        return JSONResponse(result, status_code=status)
637
638 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
639 1
    @validate_openapi(spec)
640 1
    def update_schedule(self, request: Request) -> JSONResponse:
641
        """Update a schedule.
642
643
        Change all attributes from the given schedule from a EVC circuit.
644
        The schedule ID is preserved as default.
645
        Payload example:
646
            {
647
              "date": "2019-08-07T14:52:10.967Z",
648
              "interval": "string",
649
              "frequency": "1 * * *",
650
              "action": "create"
651
            }
652
        """
653 1
        data = get_json_or_400(request, self.controller.loop)
654 1
        schedule_id = request.path_params["schedule_id"]
655 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
656
657
        # Try to find a circuit schedule
658 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
659
660
        # Can not modify circuits deleted and archived
661 1
        if not found_schedule:
662 1
            result = f"schedule_id {schedule_id} not found"
663 1
            log.debug("update_schedule result %s %s", result, 404)
664 1
            raise HTTPException(404, detail=result)
665
666 1
        new_schedule = CircuitSchedule.from_dict(data)
667 1
        new_schedule.id = found_schedule.id
668
        # Remove the old schedule
669 1
        evc.circuit_scheduler.remove(found_schedule)
670
        # Append the modified schedule
671 1
        evc.circuit_scheduler.append(new_schedule)
672
673
        # Cancel all schedule jobs
674 1
        self.sched.cancel_job(found_schedule.id)
675
        # Add the new circuit schedule
676 1
        self.sched.add_circuit_job(evc, new_schedule)
677
        # Save EVC to mongodb
678 1
        evc.sync()
679
680 1
        result = new_schedule.as_dict()
681 1
        status = 200
682
683 1
        log.debug("update_schedule result %s %s", result, status)
684 1
        return JSONResponse(result, status_code=status)
685
686 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
687 1
    def delete_schedule(self, request: Request) -> JSONResponse:
688
        """Remove a circuit schedule.
689
690
        Remove the Schedule from EVC.
691
        Remove the Schedule from cron job.
692
        Save the EVC to the Storehouse.
693
        """
694 1
        schedule_id = request.path_params["schedule_id"]
695 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
696 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
697
698
        # Can not modify circuits deleted and archived
699 1
        if not found_schedule:
700 1
            result = f"schedule_id {schedule_id} not found"
701 1
            log.debug("delete_schedule result %s %s", result, 404)
702 1
            raise HTTPException(404, detail=result)
703
704
        # Remove the old schedule
705 1
        evc.circuit_scheduler.remove(found_schedule)
706
707
        # Cancel all schedule jobs
708 1
        self.sched.cancel_job(found_schedule.id)
709
        # Save EVC to mongodb
710 1
        evc.sync()
711
712 1
        result = "Schedule removed"
713 1
        status = 200
714
715 1
        log.debug("delete_schedule result %s %s", result, status)
716 1
        return JSONResponse(result, status_code=status)
717
718 1
    def _check_no_tag_duplication(
719
        self,
720
        evc_id: str,
721
        uni_a: Optional[UNI] = None,
722
        uni_z: Optional[UNI] = None
723
    ):
724
        """Check if the given EVC has UNIs with no tag and if these are
725
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
726
        Args:
727
            evc (dict): EVC to be analyzed.
728
        """
729
730
        # No UNIs
731 1
        if not (uni_a or uni_z):
732 1
            return
733
734 1
        if (not (uni_a and not uni_a.user_tag) and
735
                not (uni_z and not uni_z.user_tag)):
736 1
            return
737 1
        for circuit in self.circuits.copy().values():
738 1
            if (not circuit.archived and circuit._id != evc_id):
739 1
                if uni_a and uni_a.user_tag is None:
740 1
                    circuit.check_no_tag_duplicate(uni_a)
741 1
                if uni_z and uni_z.user_tag is None:
742 1
                    circuit.check_no_tag_duplicate(uni_z)
743
744 1
    @listen_to("kytos/topology.link_up")
745 1
    def on_link_up(self, event):
746
        """Change circuit when link is up or end_maintenance."""
747
        self.handle_link_up(event)
748
749 1
    def handle_link_up(self, event):
750
        """Change circuit when link is up or end_maintenance."""
751 1
        log.info("Event handle_link_up %s", event.content["link"])
752 1
        for evc in self.get_evcs_by_svc_level():
753 1
            if evc.is_enabled() and not evc.archived:
754 1
                with evc.lock:
755 1
                    evc.handle_link_up(event.content["link"])
756
757
    # Possibly replace this with interruptions?
758 1
    @listen_to(
759
        '.*.switch.interface.(link_up|link_down|created|deleted)'
760
    )
761 1
    def on_interface_link_change(self, event: KytosEvent):
762
        """
763
        Handler for interface link_up and link_down events.
764
        """
765
        self.handle_on_interface_link_change(event)
766
767 1
    def handle_on_interface_link_change(self, event: KytosEvent):
768
        """
769
        Handler to sort interface events {link_(up, down), create, deleted}
770
771
        To avoid multiple database updated (link flap):
772
        Every interface is identfied and processed in parallel.
773
        Once an interface event is received a time is started.
774
        While time is running self._intf_events will be updated.
775
        After time has passed last received event will be processed.
776
        """
777 1
        iface = event.content.get("interface")
778 1
        with self._lock_interfaces[iface.id]:
779 1
            _now = event.timestamp
780
            # Return out of order events
781 1
            if (
782
                iface.id in self._intf_events
783
                and self._intf_events[iface.id]["event"].timestamp > _now
784
            ):
785 1
                return
786 1
            self._intf_events[iface.id].update({"event": event})
787 1
            if "last_acquired" in self._intf_events[iface.id]:
788 1
                return
789 1
            self._intf_events[iface.id].update({"last_acquired": now()})
790 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
791 1
        with self._lock_interfaces[iface.id]:
792 1
            event = self._intf_events[iface.id]["event"]
793 1
            self._intf_events[iface.id].pop('last_acquired', None)
794 1
            _, _, event_type = event.name.rpartition('.')
795 1
            if event_type in ('link_up', 'created'):
796 1
                self.handle_interface_link_up(iface)
797 1
            elif event_type in ('link_down', 'deleted'):
798 1
                self.handle_interface_link_down(iface)
799
800 1
    def handle_interface_link_up(self, interface):
801
        """
802
        Handler for interface link_up events
803
        """
804
        for evc in self.get_evcs_by_svc_level():
805
            with evc.lock:
806
                log.info("Event handle_interface_link_up %s", interface)
807
                evc.handle_interface_link_up(
808
                    interface
809
                )
810
811 1
    def handle_interface_link_down(self, interface):
812
        """
813
        Handler for interface link_down events
814
        """
815
        log.info("Event handle_interface_link_down %s", interface)
816
        for evc in self.get_evcs_by_svc_level():
817
            with evc.lock:
818
                evc.handle_interface_link_down(
819
                    interface
820
                )
821
822 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
823 1
    def on_link_down(self, event):
824
        """Change circuit when link is down or under_mantenance."""
825
        self.handle_link_down(event)
826
827 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
828
        """Change circuit when link is down or under_mantenance."""
829 1
        link = event.content["link"]
830 1
        log.info("Event handle_link_down %s", link)
831 1
        switch_flows = {}
832 1
        evcs_with_failover = []
833 1
        evcs_normal = []
834 1
        check_failover = []
835 1
        for evc in self.get_evcs_by_svc_level():
836 1
            with evc.lock:
837 1
                if evc.is_affected_by_link(link):
838 1
                    evc.affected_by_link_at = event.timestamp
839
                    # if there is no failover path, handles link down the
840
                    # tradditional way
841 1
                    if (
842
                        not getattr(evc, 'failover_path', None) or
843
                        evc.is_failover_path_affected_by_link(link)
844
                    ):
845 1
                        evcs_normal.append(evc)
846 1
                        continue
847 1
                    try:
848 1
                        dpid_flows = evc.get_failover_flows()
849 1
                        evc.old_path = evc.current_path
850 1
                        evc.current_path = evc.failover_path
851 1
                        evc.failover_path = Path([])
852
                    # pylint: disable=broad-except
853 1
                    except Exception:
854 1
                        err = traceback.format_exc().replace("\n", ", ")
855 1
                        log.error(
856
                            "Ignore Failover path for "
857
                            f"{evc} due to error: {err}"
858
                        )
859 1
                        evcs_normal.append(evc)
860 1
                        continue
861 1
                    for dpid, flows in dpid_flows.items():
862 1
                        switch_flows.setdefault(dpid, [])
863 1
                        switch_flows[dpid].extend(flows)
864 1
                    evcs_with_failover.append(evc)
865 1
                elif evc.is_failover_path_affected_by_link(link):
866 1
                    evc.old_path = evc.failover_path
867 1
                    evc.failover_path = Path([])
868 1
                    check_failover.append(evc)
869
870 1
        while switch_flows:
871 1
            offset = settings.BATCH_SIZE or None
872 1
            switches = list(switch_flows.keys())
873 1
            for dpid in switches:
874 1
                emit_event(
875
                    self.controller,
876
                    context="kytos.flow_manager",
877
                    name="flows.install",
878
                    content={
879
                        "dpid": dpid,
880
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
881
                    }
882
                )
883 1
                if offset is None or offset >= len(switch_flows[dpid]):
884 1
                    del switch_flows[dpid]
885 1
                    continue
886 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
887 1
            time.sleep(settings.BATCH_INTERVAL)
888
889 1
        for evc in evcs_normal:
890 1
            emit_event(
891
                self.controller,
892
                "evc_affected_by_link_down",
893
                content={"link": link} | map_evc_event_content(evc)
894
            )
895
896 1
        evcs_to_update = []
897 1
        for evc in evcs_with_failover:
898 1
            evcs_to_update.append(evc.as_dict())
899 1
            emit_event(
900
                self.controller,
901
                "redeployed_link_down",
902
                content=map_evc_event_content(evc)
903
            )
904 1
            log.info(
905
                f"{evc} redeployed with failover due to link down {link.id}"
906
            )
907 1
        for evc in check_failover:
908 1
            evcs_to_update.append(evc.as_dict())
909
910 1
        self.mongo_controller.update_evcs(evcs_to_update)
911
912 1
        emit_event(
913
            self.controller,
914
            "cleanup_evcs_old_path",
915
            content={"evcs": evcs_with_failover + check_failover}
916
        )
917
918 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
919 1
    def on_evc_affected_by_link_down(self, event):
920
        """Change circuit when link is down or under_mantenance."""
921
        self.handle_evc_affected_by_link_down(event)
922
923 1
    def handle_evc_affected_by_link_down(self, event):
924
        """Change circuit when link is down or under_mantenance."""
925 1
        evc = self.circuits.get(event.content["evc_id"])
926 1
        link = event.content['link']
927 1
        if not evc:
928 1
            return
929 1
        with evc.lock:
930 1
            if not evc.is_affected_by_link(link):
931
                return
932 1
            result = evc.handle_link_down()
933 1
        event_name = "error_redeploy_link_down"
934 1
        if result:
935 1
            log.info(f"{evc} redeployed due to link down {link.id}")
936 1
            event_name = "redeployed_link_down"
937 1
        emit_event(self.controller, event_name,
938
                   content=map_evc_event_content(evc))
939
940 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
941 1
    def on_cleanup_evcs_old_path(self, event):
942
        """Handle cleanup evcs old path."""
943
        self.handle_cleanup_evcs_old_path(event)
944
945 1
    def handle_cleanup_evcs_old_path(self, event):
946
        """Handle cleanup evcs old path."""
947 1
        evcs = event.content.get("evcs", [])
948 1
        for evc in evcs:
949 1
            if not hasattr(evc, 'old_path'):
950 1
                continue
951 1
            evc.remove_path_flows(evc.old_path)
952 1
            delattr(evc, "old_path")
953
954 1
    @listen_to("kytos/topology.topology_loaded")
955 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
956
        """Load EVCs once the topology is available."""
957
        self.load_all_evcs()
958
959 1
    def load_all_evcs(self):
960
        """Try to load all EVCs on startup."""
961 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
962 1
        for circuit_id, circuit in circuits:
963 1
            if circuit_id not in self.circuits:
964 1
                self._load_evc(circuit)
965 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
966
                   timeout=1)
967
968 1
    def _load_evc(self, circuit_dict):
969
        """Load one EVC from mongodb to memory."""
970 1
        try:
971 1
            evc = self._evc_from_dict(circuit_dict)
972 1
        except (ValueError, KytosTagError) as exception:
973 1
            log.error(
974
                f"Could not load EVC: dict={circuit_dict} error={exception}"
975
            )
976 1
            return None
977 1
        if evc.archived:
978 1
            return None
979
980 1
        self.circuits.setdefault(evc.id, evc)
981 1
        self.sched.add(evc)
982 1
        return evc
983
984 1
    @listen_to("kytos/flow_manager.flow.error")
985 1
    def on_flow_mod_error(self, event):
986
        """Handle flow mod errors related to an EVC."""
987
        self.handle_flow_mod_error(event)
988
989 1
    def handle_flow_mod_error(self, event):
990
        """Handle flow mod errors related to an EVC."""
991 1
        flow = event.content["flow"]
992 1
        command = event.content.get("error_command")
993 1
        if command != "add":
994
            return
995 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
996 1
        if evc:
997 1
            with evc.lock:
998 1
                evc.remove_current_flows()
999
1000 1
    def _evc_dict_with_instances(self, evc_dict):
1001
        """Convert some dict values to instance of EVC classes.
1002
1003
        This method will convert: [UNI, Link]
1004
        """
1005 1
        data = evc_dict.copy()  # Do not modify the original dict
1006 1
        for attribute, value in data.items():
1007
            # Get multiple attributes.
1008
            # Ex: uni_a, uni_z
1009 1
            if "uni" in attribute:
1010 1
                try:
1011 1
                    data[attribute] = self._uni_from_dict(value)
1012 1
                except ValueError as exception:
1013 1
                    result = "Error creating UNI: Invalid value"
1014 1
                    raise ValueError(result) from exception
1015
1016 1
            if attribute == "circuit_scheduler":
1017 1
                data[attribute] = []
1018 1
                for schedule in value:
1019 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1020
1021
            # Get multiple attributes.
1022
            # Ex: primary_links,
1023
            #     backup_links,
1024
            #     current_links_cache,
1025
            #     primary_links_cache,
1026
            #     backup_links_cache
1027 1
            if "links" in attribute:
1028 1
                data[attribute] = [
1029
                    self._link_from_dict(link) for link in value
1030
                ]
1031
1032
            # Ex: current_path,
1033
            #     primary_path,
1034
            #     backup_path
1035 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1036 1
                data[attribute] = Path(
1037
                    [self._link_from_dict(link) for link in value]
1038
                )
1039
1040 1
        return data
1041
1042 1
    def _evc_from_dict(self, evc_dict):
1043 1
        data = self._evc_dict_with_instances(evc_dict)
1044 1
        data["table_group"] = self.table_group
1045 1
        return EVC(self.controller, **data)
1046
1047 1
    def _uni_from_dict(self, uni_dict):
1048
        """Return a UNI object from python dict."""
1049 1
        if uni_dict is None:
1050 1
            return False
1051
1052 1
        interface_id = uni_dict.get("interface_id")
1053 1
        interface = self.controller.get_interface_by_id(interface_id)
1054 1
        if interface is None:
1055 1
            result = (
1056
                "Error creating UNI:"
1057
                + f"Could not instantiate interface {interface_id}"
1058
            )
1059 1
            raise ValueError(result) from ValueError
1060 1
        tag_convert = {1: "vlan"}
1061 1
        tag_dict = uni_dict.get("tag", None)
1062 1
        if tag_dict:
1063 1
            tag_type = tag_dict.get("tag_type")
1064 1
            tag_type = tag_convert.get(tag_type, tag_type)
1065 1
            tag_value = tag_dict.get("value")
1066 1
            if isinstance(tag_value, list):
1067 1
                tag_value = get_tag_ranges(tag_value)
1068 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1069 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1070
            else:
1071 1
                tag = TAG(tag_type, tag_value)
1072
        else:
1073 1
            tag = None
1074 1
        uni = UNI(interface, tag)
1075 1
        return uni
1076
1077 1
    def _link_from_dict(self, link_dict):
1078
        """Return a Link object from python dict."""
1079 1
        id_a = link_dict.get("endpoint_a").get("id")
1080 1
        id_b = link_dict.get("endpoint_b").get("id")
1081
1082 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1083 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1084 1
        if not endpoint_a:
1085 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1086 1
            raise ValueError(error_msg)
1087 1
        if not endpoint_b:
1088
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1089
            raise ValueError(error_msg)
1090
1091 1
        link = Link(endpoint_a, endpoint_b)
1092 1
        if "metadata" in link_dict:
1093 1
            link.extend_metadata(link_dict.get("metadata"))
1094
1095 1
        s_vlan = link.get_metadata("s_vlan")
1096 1
        if s_vlan:
1097 1
            tag = TAG.from_dict(s_vlan)
1098 1
            if tag is False:
1099
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1100
                raise ValueError(error_msg)
1101 1
            link.update_metadata("s_vlan", tag)
1102 1
        return link
1103
1104 1
    def _find_evc_by_schedule_id(self, schedule_id):
1105
        """
1106
        Find an EVC and CircuitSchedule based on schedule_id.
1107
1108
        :param schedule_id: Schedule ID
1109
        :return: EVC and Schedule
1110
        """
1111 1
        circuits = self._get_circuits_buffer()
1112 1
        found_schedule = None
1113 1
        evc = None
1114
1115
        # pylint: disable=unused-variable
1116 1
        for c_id, circuit in circuits.items():
1117 1
            for schedule in circuit.circuit_scheduler:
1118 1
                if schedule.id == schedule_id:
1119 1
                    found_schedule = schedule
1120 1
                    evc = circuit
1121 1
                    break
1122 1
            if found_schedule:
1123 1
                break
1124 1
        return evc, found_schedule
1125
1126 1
    def _get_circuits_buffer(self):
1127
        """
1128
        Return the circuit buffer.
1129
1130
        If the buffer is empty, try to load data from mongodb.
1131
        """
1132 1
        if not self.circuits:
1133
            # Load circuits from mongodb to buffer
1134 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1135 1
            for c_id, circuit in circuits.items():
1136 1
                evc = self._evc_from_dict(circuit)
1137 1
                self.circuits[c_id] = evc
1138 1
        return self.circuits
1139
1140
    # pylint: disable=attribute-defined-outside-init
1141 1
    @alisten_to("kytos/of_multi_table.enable_table")
1142 1
    async def on_table_enabled(self, event):
1143
        """Handle a recently table enabled."""
1144 1
        table_group = event.content.get("mef_eline", None)
1145 1
        if not table_group:
1146 1
            return
1147 1
        for group in table_group:
1148 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1149 1
                log.error(f'The table group "{group}" is not allowed for '
1150
                          f'mef_eline. Allowed table groups are '
1151
                          f'{settings.TABLE_GROUP_ALLOWED}')
1152 1
                return
1153 1
        self.table_group.update(table_group)
1154 1
        content = {"group_table": self.table_group}
1155 1
        name = "kytos/mef_eline.enable_table"
1156
        await aemit_event(self.controller, name, content)
1157