Passed
Pull Request — master (#438)
by Italo Valcy
03:55
created

build.main.Main.on_flow_mod_error()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
crap 1.037
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self._lock_handle_link_down = Lock()
72 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
73
74 1
        self.load_all_evcs()
75 1
        self._topology_updated_at = None
76
77 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
78
        """Get circuits sorted by desc service level and asc creation_time.
79
80
        In the future, as more ops are offloaded it should be get from the DB.
81
        """
82 1
        if enable_filter:
83 1
            return sorted(
84
                          [circuit for circuit in self.circuits.values()
85
                           if circuit.is_enabled()],
86
                          key=lambda x: (-x.service_level, x.creation_time),
87
            )
88 1
        return sorted(self.circuits.values(),
89
                      key=lambda x: (-x.service_level, x.creation_time))
90
91 1
    @staticmethod
92 1
    def get_eline_controller():
93
        """Return the ELineController instance."""
94
        return controllers.ELineController()
95
96 1
    def execute(self):
97
        """Execute once when the napp is running."""
98 1
        if self._lock.locked():
99 1
            return
100 1
        log.debug("Starting consistency routine")
101 1
        with self._lock:
102 1
            self.execute_consistency()
103 1
        log.debug("Finished consistency routine")
104
105 1
    def should_be_checked(self, circuit):
106
        "Verify if the circuit meets the necessary conditions to be checked"
107
        # pylint: disable=too-many-boolean-expressions
108 1
        if (
109
                circuit.is_enabled()
110
                and not circuit.is_active()
111
                and not circuit.lock.locked()
112
                and not circuit.has_recent_removed_flow()
113
                and not circuit.is_recent_updated()
114
                and circuit.are_unis_active(self.controller.switches)
115
                # if a inter-switch EVC does not have current_path, it does not
116
                # make sense to run sdntrace on it
117
                and (circuit.is_intra_switch() or circuit.current_path)
118
                ):
119 1
            return True
120
        return False
121
122 1
    def execute_consistency(self):
123
        """Execute consistency routine."""
124 1
        circuits_to_check = []
125 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
126 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
127 1
            stored_circuits.pop(circuit.id, None)
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
131 1
        for circuit in circuits_to_check:
132 1
            is_checked = circuits_checked.get(circuit.id)
133 1
            if is_checked:
134 1
                circuit.execution_rounds = 0
135 1
                log.info(f"{circuit} enabled but inactive - activating")
136 1
                with circuit.lock:
137 1
                    circuit.activate()
138 1
                    circuit.sync()
139
            else:
140 1
                circuit.execution_rounds += 1
141 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
142 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
143 1
                    with circuit.lock:
144 1
                        circuit.deploy()
145 1
        for circuit_id in stored_circuits:
146 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
147 1
            self._load_evc(stored_circuits[circuit_id])
148
149 1
    def shutdown(self):
150
        """Execute when your napp is unloaded.
151
152
        If you have some cleanup procedure, insert it here.
153
        """
154
155 1
    @rest("/v2/evc/", methods=["GET"])
156 1
    def list_circuits(self, request: Request) -> JSONResponse:
157
        """Endpoint to return circuits stored.
158
159
        archive query arg if defined (not null) will be filtered
160
        accordingly, by default only non archived evcs will be listed
161
        """
162 1
        log.debug("list_circuits /v2/evc")
163 1
        args = request.query_params
164 1
        archived = args.get("archived", "false").lower()
165 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
166 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
167
                                                      metadata=args)
168 1
        circuits = circuits['circuits']
169 1
        return JSONResponse(circuits)
170
171 1
    @rest("/v2/evc/schedule", methods=["GET"])
172 1
    def list_schedules(self, _request: Request) -> JSONResponse:
173
        """Endpoint to return all schedules stored for all circuits.
174
175
        Return a JSON with the following template:
176
        [{"schedule_id": <schedule_id>,
177
         "circuit_id": <circuit_id>,
178
         "schedule": <schedule object>}]
179
        """
180 1
        log.debug("list_schedules /v2/evc/schedule")
181 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
182 1
        if not circuits:
183 1
            result = {}
184 1
            status = 200
185 1
            return JSONResponse(result, status_code=status)
186
187 1
        result = []
188 1
        status = 200
189 1
        for circuit in circuits:
190 1
            circuit_scheduler = circuit.get("circuit_scheduler")
191 1
            if circuit_scheduler:
192 1
                for scheduler in circuit_scheduler:
193 1
                    value = {
194
                        "schedule_id": scheduler.get("id"),
195
                        "circuit_id": circuit.get("id"),
196
                        "schedule": scheduler,
197
                    }
198 1
                    result.append(value)
199
200 1
        log.debug("list_schedules result %s %s", result, status)
201 1
        return JSONResponse(result, status_code=status)
202
203 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
204 1
    def get_circuit(self, request: Request) -> JSONResponse:
205
        """Endpoint to return a circuit based on id."""
206 1
        circuit_id = request.path_params["circuit_id"]
207 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
208 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
209 1
        if not circuit:
210 1
            result = f"circuit_id {circuit_id} not found"
211 1
            log.debug("get_circuit result %s %s", result, 404)
212 1
            raise HTTPException(404, detail=result)
213 1
        status = 200
214 1
        log.debug("get_circuit result %s %s", circuit, status)
215 1
        return JSONResponse(circuit, status_code=status)
216
217
    # pylint: disable=too-many-branches, too-many-statements
218 1
    @rest("/v2/evc/", methods=["POST"])
219 1
    @validate_openapi(spec)
220 1
    def create_circuit(self, request: Request) -> JSONResponse:
221
        """Try to create a new circuit.
222
223
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
224
        UNI_Z's requested C-VID are available from the interfaces' pools. This
225
        is checked when creating the UNI object.
226
227
        Then, E-Line NApp requests a primary and a backup path to the
228
        Pathfinder NApp using the attributes primary_links and backup_links
229
        submitted via REST
230
231
        # For each link composing paths in #3:
232
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
233
        #  - Using the S-VID obtained, generate abstract flow entries to be
234
        #    sent to FlowManager
235
236
        Push abstract flow entries to FlowManager and FlowManager pushes
237
        OpenFlow entries to datapaths
238
239
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
240
        creation
241
242
        Finnaly, notify user of the status of its request.
243
        """
244
        # Try to create the circuit object
245 1
        log.debug("create_circuit /v2/evc/")
246 1
        data = get_json_or_400(request, self.controller.loop)
247
248 1
        try:
249 1
            evc = self._evc_from_dict(data)
250 1
        except (ValueError, KytosTagError) as exception:
251 1
            log.debug("create_circuit result %s %s", exception, 400)
252 1
            raise HTTPException(400, detail=str(exception)) from exception
253 1
        try:
254 1
            check_disabled_component(evc.uni_a, evc.uni_z)
255 1
        except DisabledSwitch as exception:
256 1
            log.debug("create_circuit result %s %s", exception, 409)
257 1
            raise HTTPException(
258
                    409,
259
                    detail=f"Path is not valid: {exception}"
260
                ) from exception
261
262 1
        if evc.primary_path:
263 1
            try:
264 1
                evc.primary_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"primary_path is not valid: {exception}"
273
                ) from exception
274 1
        if evc.backup_path:
275 1
            try:
276 1
                evc.backup_path.is_valid(
277
                    evc.uni_a.interface.switch,
278
                    evc.uni_z.interface.switch,
279
                    bool(evc.circuit_scheduler),
280
                )
281 1
            except InvalidPath as exception:
282 1
                raise HTTPException(
283
                    400,
284
                    detail=f"backup_path is not valid: {exception}"
285
                ) from exception
286
287 1
        if not evc._tag_lists_equal():
288 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
289 1
            raise HTTPException(400, detail=detail)
290
291 1
        try:
292 1
            evc._validate_has_primary_or_dynamic()
293 1
        except ValueError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295
296 1
        try:
297 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
298
        except DuplicatedNoTagUNI as exception:
299
            log.debug("create_circuit result %s %s", exception, 409)
300
            raise HTTPException(409, detail=str(exception)) from exception
301
302 1
        try:
303 1
            self._use_uni_tags(evc)
304 1
        except KytosTagError as exception:
305 1
            raise HTTPException(400, detail=str(exception)) from exception
306
307
        # save circuit
308 1
        try:
309 1
            evc.sync()
310
        except ValidationError as exception:
311
            raise HTTPException(400, detail=str(exception)) from exception
312
313
        # store circuit in dictionary
314 1
        self.circuits[evc.id] = evc
315
316
        # Schedule the circuit deploy
317 1
        self.sched.add(evc)
318
319
        # Circuit has no schedule, deploy now
320 1
        if not evc.circuit_scheduler:
321 1
            with evc.lock:
322 1
                evc.deploy()
323
324
        # Notify users
325 1
        result = {"circuit_id": evc.id}
326 1
        status = 201
327 1
        log.debug("create_circuit result %s %s", result, status)
328 1
        emit_event(self.controller, name="created",
329
                   content=map_evc_event_content(evc))
330 1
        return JSONResponse(result, status_code=status)
331
332 1
    @staticmethod
333 1
    def _use_uni_tags(evc):
334 1
        uni_a = evc.uni_a
335 1
        evc._use_uni_vlan(uni_a)
336 1
        try:
337 1
            uni_z = evc.uni_z
338 1
            evc._use_uni_vlan(uni_z)
339 1
        except KytosTagError as err:
340 1
            evc.make_uni_vlan_available(uni_a)
341 1
            raise err
342
343 1
    @listen_to('kytos/flow_manager.flow.removed')
344 1
    def on_flow_delete(self, event):
345
        """Capture delete messages to keep track when flows got removed."""
346
        self.handle_flow_delete(event)
347
348 1
    def handle_flow_delete(self, event):
349
        """Keep track when the EVC got flows removed by deriving its cookie."""
350 1
        flow = event.content["flow"]
351 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
352 1
        if evc:
353 1
            log.debug("Flow removed in EVC %s", evc.id)
354 1
            evc.set_flow_removed_at()
355
356 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
357 1
    @validate_openapi(spec)
358 1
    def update(self, request: Request) -> JSONResponse:
359
        """Update a circuit based on payload.
360
361
        The EVC attributes (creation_time, active, current_path,
362
        failover_path, _id, archived) can't be updated.
363
        """
364 1
        data = get_json_or_400(request, self.controller.loop)
365 1
        circuit_id = request.path_params["circuit_id"]
366 1
        log.debug("update /v2/evc/%s", circuit_id)
367 1
        try:
368 1
            evc = self.circuits[circuit_id]
369 1
        except KeyError:
370 1
            result = f"circuit_id {circuit_id} not found"
371 1
            log.debug("update result %s %s", result, 404)
372 1
            raise HTTPException(404, detail=result) from KeyError
373
374 1
        try:
375 1
            updated_data = self._evc_dict_with_instances(data)
376 1
            self._check_no_tag_duplication(
377
                circuit_id, updated_data.get("uni_a"),
378
                updated_data.get("uni_z")
379
            )
380 1
            enable, redeploy = evc.update(**updated_data)
381 1
        except (ValueError, KytosTagError, ValidationError) as exception:
382 1
            log.debug("update result %s %s", exception, 400)
383 1
            raise HTTPException(400, detail=str(exception)) from exception
384 1
        except DuplicatedNoTagUNI as exception:
385
            log.debug("update result %s %s", exception, 409)
386
            raise HTTPException(409, detail=str(exception)) from exception
387 1
        except DisabledSwitch as exception:
388 1
            log.debug("update result %s %s", exception, 409)
389 1
            raise HTTPException(
390
                    409,
391
                    detail=f"Path is not valid: {exception}"
392
                ) from exception
393
394 1
        if evc.is_active():
395
            if enable is False:  # disable if active
396
                with evc.lock:
397
                    evc.remove()
398
            elif redeploy is not None:  # redeploy if active
399
                with evc.lock:
400
                    evc.remove()
401
                    evc.deploy()
402
        else:
403 1
            if enable is True:  # enable if inactive
404 1
                with evc.lock:
405 1
                    evc.deploy()
406 1
            elif evc.is_enabled() and redeploy:
407 1
                with evc.lock:
408 1
                    evc.remove()
409 1
                    evc.deploy()
410 1
        result = {evc.id: evc.as_dict()}
411 1
        status = 200
412
413 1
        log.debug("update result %s %s", result, status)
414 1
        emit_event(self.controller, "updated",
415
                   content=map_evc_event_content(evc, **data))
416 1
        return JSONResponse(result, status_code=status)
417
418 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
419 1
    def delete_circuit(self, request: Request) -> JSONResponse:
420
        """Remove a circuit.
421
422
        First, the flows are removed from the switches, and then the EVC is
423
        disabled.
424
        """
425 1
        circuit_id = request.path_params["circuit_id"]
426 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
427 1
        try:
428 1
            evc = self.circuits.pop(circuit_id)
429 1
        except KeyError:
430 1
            result = f"circuit_id {circuit_id} not found"
431 1
            log.debug("delete_circuit result %s %s", result, 404)
432 1
            raise HTTPException(404, detail=result) from KeyError
433
434 1
        log.info("Removing %s", evc)
435 1
        with evc.lock:
436 1
            evc.remove_current_flows()
437 1
            evc.remove_failover_flows(sync=False)
438 1
            evc.deactivate()
439 1
            evc.disable()
440 1
            self.sched.remove(evc)
441 1
            evc.archive()
442 1
            evc.remove_uni_tags()
443 1
            evc.sync()
444 1
        log.info("EVC removed. %s", evc)
445 1
        result = {"response": f"Circuit {circuit_id} removed"}
446 1
        status = 200
447
448 1
        log.debug("delete_circuit result %s %s", result, status)
449 1
        emit_event(self.controller, "deleted",
450
                   content=map_evc_event_content(evc))
451 1
        return JSONResponse(result, status_code=status)
452
453 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
454 1
    def get_metadata(self, request: Request) -> JSONResponse:
455
        """Get metadata from an EVC."""
456 1
        circuit_id = request.path_params["circuit_id"]
457 1
        try:
458 1
            return (
459
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
460
            )
461
        except KeyError as error:
462
            raise HTTPException(
463
                404,
464
                detail=f"circuit_id {circuit_id} not found."
465
            ) from error
466
467 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
468 1
    @validate_openapi(spec)
469 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
470
        """Add metadata to a bulk of EVCs."""
471 1
        data = get_json_or_400(request, self.controller.loop)
472 1
        circuit_ids = data.pop("circuit_ids")
473
474 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
475
476 1
        fail_evcs = []
477 1
        for _id in circuit_ids:
478 1
            try:
479 1
                evc = self.circuits[_id]
480 1
                evc.extend_metadata(data)
481 1
            except KeyError:
482 1
                fail_evcs.append(_id)
483
484 1
        if fail_evcs:
485 1
            raise HTTPException(404, detail=fail_evcs)
486 1
        return JSONResponse("Operation successful", status_code=201)
487
488 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
489 1
    @validate_openapi(spec)
490 1
    def add_metadata(self, request: Request) -> JSONResponse:
491
        """Add metadata to an EVC."""
492 1
        circuit_id = request.path_params["circuit_id"]
493 1
        metadata = get_json_or_400(request, self.controller.loop)
494 1
        if not isinstance(metadata, dict):
495
            raise HTTPException(400, "Invalid metadata value: {metadata}")
496 1
        try:
497 1
            evc = self.circuits[circuit_id]
498 1
        except KeyError as error:
499 1
            raise HTTPException(
500
                404,
501
                detail=f"circuit_id {circuit_id} not found."
502
            ) from error
503
504 1
        evc.extend_metadata(metadata)
505 1
        evc.sync()
506 1
        return JSONResponse("Operation successful", status_code=201)
507
508 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
509 1
    @validate_openapi(spec)
510 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
511
        """Delete metada from a bulk of EVCs"""
512 1
        data = get_json_or_400(request, self.controller.loop)
513 1
        key = request.path_params["key"]
514 1
        circuit_ids = data.pop("circuit_ids")
515 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
516
517 1
        fail_evcs = []
518 1
        for _id in circuit_ids:
519 1
            try:
520 1
                evc = self.circuits[_id]
521 1
                evc.remove_metadata(key)
522 1
            except KeyError:
523 1
                fail_evcs.append(_id)
524
525 1
        if fail_evcs:
526 1
            raise HTTPException(404, detail=fail_evcs)
527 1
        return JSONResponse("Operation successful")
528
529 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
530 1
    def delete_metadata(self, request: Request) -> JSONResponse:
531
        """Delete metadata from an EVC."""
532 1
        circuit_id = request.path_params["circuit_id"]
533 1
        key = request.path_params["key"]
534 1
        try:
535 1
            evc = self.circuits[circuit_id]
536 1
        except KeyError as error:
537 1
            raise HTTPException(
538
                404,
539
                detail=f"circuit_id {circuit_id} not found."
540
            ) from error
541
542 1
        evc.remove_metadata(key)
543 1
        evc.sync()
544 1
        return JSONResponse("Operation successful")
545
546 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
547 1
    def redeploy(self, request: Request) -> JSONResponse:
548
        """Endpoint to force the redeployment of an EVC."""
549 1
        circuit_id = request.path_params["circuit_id"]
550 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
551 1
        try:
552 1
            evc = self.circuits[circuit_id]
553 1
        except KeyError:
554 1
            raise HTTPException(
555
                404,
556
                detail=f"circuit_id {circuit_id} not found"
557
            ) from KeyError
558 1
        if evc.is_enabled():
559 1
            with evc.lock:
560 1
                evc.remove_current_flows()
561 1
                evc.deploy()
562 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
563 1
            status = 202
564
        else:
565 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
566 1
            status = 409
567
568 1
        return JSONResponse(result, status_code=status)
569
570 1
    @rest("/v2/evc/schedule/", methods=["POST"])
571 1
    @validate_openapi(spec)
572 1
    def create_schedule(self, request: Request) -> JSONResponse:
573
        """
574
        Create a new schedule for a given circuit.
575
576
        This service do no check if there are conflicts with another schedule.
577
        Payload example:
578
            {
579
              "circuit_id":"aa:bb:cc",
580
              "schedule": {
581
                "date": "2019-08-07T14:52:10.967Z",
582
                "interval": "string",
583
                "frequency": "1 * * * *",
584
                "action": "create"
585
              }
586
            }
587
        """
588 1
        log.debug("create_schedule /v2/evc/schedule/")
589 1
        data = get_json_or_400(request, self.controller.loop)
590 1
        circuit_id = data["circuit_id"]
591 1
        schedule_data = data["schedule"]
592
593
        # Get EVC from circuits buffer
594 1
        circuits = self._get_circuits_buffer()
595
596
        # get the circuit
597 1
        evc = circuits.get(circuit_id)
598
599
        # get the circuit
600 1
        if not evc:
601 1
            result = f"circuit_id {circuit_id} not found"
602 1
            log.debug("create_schedule result %s %s", result, 404)
603 1
            raise HTTPException(404, detail=result)
604
605
        # new schedule from dict
606 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
607
608
        # If there is no schedule, create the list
609 1
        if not evc.circuit_scheduler:
610 1
            evc.circuit_scheduler = []
611
612
        # Add the new schedule
613 1
        evc.circuit_scheduler.append(new_schedule)
614
615
        # Add schedule job
616 1
        self.sched.add_circuit_job(evc, new_schedule)
617
618
        # save circuit to mongodb
619 1
        evc.sync()
620
621 1
        result = new_schedule.as_dict()
622 1
        status = 201
623
624 1
        log.debug("create_schedule result %s %s", result, status)
625 1
        return JSONResponse(result, status_code=status)
626
627 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
628 1
    @validate_openapi(spec)
629 1
    def update_schedule(self, request: Request) -> JSONResponse:
630
        """Update a schedule.
631
632
        Change all attributes from the given schedule from a EVC circuit.
633
        The schedule ID is preserved as default.
634
        Payload example:
635
            {
636
              "date": "2019-08-07T14:52:10.967Z",
637
              "interval": "string",
638
              "frequency": "1 * * *",
639
              "action": "create"
640
            }
641
        """
642 1
        data = get_json_or_400(request, self.controller.loop)
643 1
        schedule_id = request.path_params["schedule_id"]
644 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
645
646
        # Try to find a circuit schedule
647 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
648
649
        # Can not modify circuits deleted and archived
650 1
        if not found_schedule:
651 1
            result = f"schedule_id {schedule_id} not found"
652 1
            log.debug("update_schedule result %s %s", result, 404)
653 1
            raise HTTPException(404, detail=result)
654
655 1
        new_schedule = CircuitSchedule.from_dict(data)
656 1
        new_schedule.id = found_schedule.id
657
        # Remove the old schedule
658 1
        evc.circuit_scheduler.remove(found_schedule)
659
        # Append the modified schedule
660 1
        evc.circuit_scheduler.append(new_schedule)
661
662
        # Cancel all schedule jobs
663 1
        self.sched.cancel_job(found_schedule.id)
664
        # Add the new circuit schedule
665 1
        self.sched.add_circuit_job(evc, new_schedule)
666
        # Save EVC to mongodb
667 1
        evc.sync()
668
669 1
        result = new_schedule.as_dict()
670 1
        status = 200
671
672 1
        log.debug("update_schedule result %s %s", result, status)
673 1
        return JSONResponse(result, status_code=status)
674
675 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
676 1
    def delete_schedule(self, request: Request) -> JSONResponse:
677
        """Remove a circuit schedule.
678
679
        Remove the Schedule from EVC.
680
        Remove the Schedule from cron job.
681
        Save the EVC to the Storehouse.
682
        """
683 1
        schedule_id = request.path_params["schedule_id"]
684 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
685 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
686
687
        # Can not modify circuits deleted and archived
688 1
        if not found_schedule:
689 1
            result = f"schedule_id {schedule_id} not found"
690 1
            log.debug("delete_schedule result %s %s", result, 404)
691 1
            raise HTTPException(404, detail=result)
692
693
        # Remove the old schedule
694 1
        evc.circuit_scheduler.remove(found_schedule)
695
696
        # Cancel all schedule jobs
697 1
        self.sched.cancel_job(found_schedule.id)
698
        # Save EVC to mongodb
699 1
        evc.sync()
700
701 1
        result = "Schedule removed"
702 1
        status = 200
703
704 1
        log.debug("delete_schedule result %s %s", result, status)
705 1
        return JSONResponse(result, status_code=status)
706
707 1
    def _check_no_tag_duplication(
708
        self,
709
        evc_id: str,
710
        uni_a: Optional[UNI] = None,
711
        uni_z: Optional[UNI] = None
712
    ):
713
        """Check if the given EVC has UNIs with no tag and if these are
714
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
715
        Args:
716
            evc (dict): EVC to be analyzed.
717
        """
718
719
        # No UNIs
720 1
        if not (uni_a or uni_z):
721 1
            return
722
723 1
        if (not (uni_a and not uni_a.user_tag) and
724
                not (uni_z and not uni_z.user_tag)):
725 1
            return
726 1
        for circuit in self.circuits.copy().values():
727 1
            if (not circuit.archived and circuit._id != evc_id):
728 1
                if uni_a and uni_a.user_tag is None:
729 1
                    circuit.check_no_tag_duplicate(uni_a)
730 1
                if uni_z and uni_z.user_tag is None:
731 1
                    circuit.check_no_tag_duplicate(uni_z)
732
733 1
    @listen_to("kytos/topology.link_up")
734 1
    def on_link_up(self, event):
735
        """Change circuit when link is up or end_maintenance."""
736
        self.handle_link_up(event)
737
738 1
    def handle_link_up(self, event):
739
        """Change circuit when link is up or end_maintenance."""
740 1
        log.info("Event handle_link_up %s", event.content["link"])
741 1
        for evc in self.get_evcs_by_svc_level():
742 1
            if evc.is_enabled() and not evc.archived:
743 1
                with evc.lock:
744 1
                    evc.handle_link_up(event.content["link"])
745
746
    # Possibly replace this with interruptions?
747 1
    @listen_to(
748
        '.*.switch.interface.(link_up|link_down|created|deleted)'
749
    )
750 1
    def on_interface_link_change(self, event: KytosEvent):
751
        """
752
        Handler for interface link_up and link_down events.
753
        """
754
        self.handle_on_interface_link_change(event)
755
756 1
    def handle_on_interface_link_change(self, event: KytosEvent):
757
        """
758
        Handler to sort interface events {link_(up, down), create, deleted}
759
760
        To avoid multiple database updated (link flap):
761
        Every interface is identfied and processed in parallel.
762
        Once an interface event is received a time is started.
763
        While time is running self._intf_events will be updated.
764
        After time has passed last received event will be processed.
765
        """
766 1
        iface = event.content.get("interface")
767 1
        with self._lock_interfaces[iface.id]:
768 1
            _now = event.timestamp
769
            # Return out of order events
770 1
            if (
771
                iface.id in self._intf_events
772
                and self._intf_events[iface.id]["event"].timestamp > _now
773
            ):
774 1
                return
775 1
            self._intf_events[iface.id].update({"event": event})
776 1
            if "last_acquired" in self._intf_events[iface.id]:
777 1
                return
778 1
            self._intf_events[iface.id].update({"last_acquired": now()})
779 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
780 1
        with self._lock_interfaces[iface.id]:
781 1
            event = self._intf_events[iface.id]["event"]
782 1
            self._intf_events[iface.id].pop('last_acquired', None)
783 1
            _, _, event_type = event.name.rpartition('.')
784 1
            if event_type in ('link_up', 'created'):
785 1
                self.handle_interface_link_up(iface)
786 1
            elif event_type in ('link_down', 'deleted'):
787 1
                self.handle_interface_link_down(iface)
788
789 1
    def handle_interface_link_up(self, interface):
790
        """
791
        Handler for interface link_up events
792
        """
793
        for evc in self.get_evcs_by_svc_level():
794
            with evc.lock:
795
                log.info("Event handle_interface_link_up %s", interface)
796
                evc.handle_interface_link_up(
797
                    interface
798
                )
799
800 1
    def handle_interface_link_down(self, interface):
801
        """
802
        Handler for interface link_down events
803
        """
804
        log.info("Event handle_interface_link_down %s", interface)
805
        for evc in self.get_evcs_by_svc_level():
806
            with evc.lock:
807
                evc.handle_interface_link_down(
808
                    interface
809
                )
810
811 1
    @listen_to("kytos/topology.link_down")
812 1
    def on_link_down(self, event):
813
        """Change circuit when link is down or under_mantenance."""
814
        with self._lock_handle_link_down:
815
            self.handle_link_down(event)
816
817 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
818
        """Change circuit when link is down or under_mantenance."""
819 1
        link = event.content["link"]
820 1
        log.info("Event handle_link_down %s", link)
821 1
        switch_flows = {}
822 1
        evcs_with_failover = []
823 1
        evcs_normal = []
824 1
        check_failover = []
825 1
        for evc in self.get_evcs_by_svc_level():
826 1
            with evc.lock:
827 1
                if evc.is_affected_by_link(link):
828
                    # if there is no failover path, handles link down the
829
                    # tradditional way
830 1
                    if (
831
                        not getattr(evc, 'failover_path', None) or
832
                        evc.is_failover_path_affected_by_link(link)
833
                    ):
834 1
                        evcs_normal.append(evc)
835 1
                        continue
836 1
                    try:
837 1
                        dpid_flows = evc.get_failover_flows()
838 1
                        evc.old_path = evc.current_path
839 1
                        evc.current_path = evc.failover_path
840 1
                        evc.failover_path = Path([])
841 1
                        evc.sync()
842
                    # pylint: disable=broad-except
843 1
                    except Exception:
844 1
                        err = traceback.format_exc().replace("\n", ", ")
845 1
                        log.error(
846
                            "Ignore Failover path for "
847
                            f"{evc} due to error: {err}"
848
                        )
849 1
                        evcs_normal.append(evc)
850 1
                        continue
851 1
                    for dpid, flows in dpid_flows.items():
852 1
                        switch_flows.setdefault(dpid, [])
853 1
                        switch_flows[dpid].extend(flows)
854 1
                    evcs_with_failover.append(evc)
855 1
                elif evc.is_failover_path_affected_by_link(link):
856 1
                    check_failover.append(evc)
857
858 1
        while switch_flows:
859 1
            offset = settings.BATCH_SIZE or None
860 1
            switches = list(switch_flows.keys())
861 1
            for dpid in switches:
862 1
                emit_event(
863
                    self.controller,
864
                    context="kytos.flow_manager",
865
                    name="flows.install",
866
                    content={
867
                        "dpid": dpid,
868
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
869
                    }
870
                )
871 1
                if offset is None or offset >= len(switch_flows[dpid]):
872 1
                    del switch_flows[dpid]
873 1
                    continue
874 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
875 1
            time.sleep(settings.BATCH_INTERVAL)
876
877 1
        for evc in evcs_normal:
878 1
            emit_event(
879
                self.controller,
880
                "evc_affected_by_link_down",
881
                content={"link": link} | map_evc_event_content(evc)
882
            )
883
884 1
        for evc in evcs_with_failover:
885 1
            emit_event(
886
                self.controller,
887
                "redeployed_link_down",
888
                content=map_evc_event_content(evc,  old_path=evc.old_path)
889
            )
890 1
            log.info(
891
                f"{evc} redeployed with failover due to link down {link.id}"
892
            )
893
894
        # After handling the hot path, check if new failover paths are needed.
895
        # Note that EVCs affected by link down will generate a KytosEvent for
896
        # deployed|redeployed, which will trigger the failover path setup.
897
        # Thus, we just need to further check the check_failover list
898 1
        for evc in check_failover:
899 1
            if evc.is_failover_path_affected_by_link(link):
900 1
                with evc.lock:
901 1
                    evc.setup_failover_path()
902
903 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
904 1
    def on_evc_affected_by_link_down(self, event):
905
        """Change circuit when link is down or under_mantenance."""
906
        self.handle_evc_affected_by_link_down(event)
907
908 1
    def handle_evc_affected_by_link_down(self, event):
909
        """Change circuit when link is down or under_mantenance."""
910 1
        evc = self.circuits.get(event.content["evc_id"])
911 1
        link = event.content['link']
912 1
        if not evc:
913 1
            return
914 1
        with evc.lock:
915 1
            if not evc.is_affected_by_link(link):
916
                return
917 1
            result = evc.handle_link_down()
918 1
        event_name = "error_redeploy_link_down"
919 1
        if result:
920 1
            log.info(f"{evc} redeployed due to link down {link.id}")
921 1
            event_name = "redeployed_link_down"
922 1
        emit_event(self.controller, event_name,
923
                   content=map_evc_event_content(evc))
924
925 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
926 1
    def on_evc_deployed(self, event):
927
        """Handle EVC deployed|redeployed_link_down."""
928
        self.handle_evc_deployed(event)
929
930 1
    def handle_evc_deployed(self, event):
931
        """Setup failover path on evc deployed."""
932 1
        evc = self.circuits.get(event.content["evc_id"])
933 1
        old_path = event.content.get("old_path")
934 1
        if not evc:
935 1
            return
936 1
        with evc.lock:
937 1
            evc.setup_failover_path(old_path)
938
939 1
    @listen_to("kytos/topology.topology_loaded")
940 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
941
        """Load EVCs once the topology is available."""
942
        self.load_all_evcs()
943
944 1
    def load_all_evcs(self):
945
        """Try to load all EVCs on startup."""
946 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
947 1
        for circuit_id, circuit in circuits:
948 1
            if circuit_id not in self.circuits:
949 1
                self._load_evc(circuit)
950 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
951
                   timeout=1)
952
953 1
    def _load_evc(self, circuit_dict):
954
        """Load one EVC from mongodb to memory."""
955 1
        try:
956 1
            evc = self._evc_from_dict(circuit_dict)
957 1
        except (ValueError, KytosTagError) as exception:
958 1
            log.error(
959
                f"Could not load EVC: dict={circuit_dict} error={exception}"
960
            )
961 1
            return None
962 1
        if evc.archived:
963 1
            return None
964
965 1
        self.circuits.setdefault(evc.id, evc)
966 1
        self.sched.add(evc)
967 1
        return evc
968
969 1
    @listen_to("kytos/flow_manager.flow.error")
970 1
    def on_flow_mod_error(self, event):
971
        """Handle flow mod errors related to an EVC."""
972
        self.handle_flow_mod_error(event)
973
974 1
    def handle_flow_mod_error(self, event):
975
        """Handle flow mod errors related to an EVC."""
976 1
        flow = event.content["flow"]
977 1
        command = event.content.get("error_command")
978 1
        if command != "add":
979
            return
980 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
981 1
        if evc:
982 1
            with evc.lock:
983 1
                evc.remove_current_flows()
984
985 1
    def _evc_dict_with_instances(self, evc_dict):
986
        """Convert some dict values to instance of EVC classes.
987
988
        This method will convert: [UNI, Link]
989
        """
990 1
        data = evc_dict.copy()  # Do not modify the original dict
991 1
        for attribute, value in data.items():
992
            # Get multiple attributes.
993
            # Ex: uni_a, uni_z
994 1
            if "uni" in attribute:
995 1
                try:
996 1
                    data[attribute] = self._uni_from_dict(value)
997 1
                except ValueError as exception:
998 1
                    result = "Error creating UNI: Invalid value"
999 1
                    raise ValueError(result) from exception
1000
1001 1
            if attribute == "circuit_scheduler":
1002 1
                data[attribute] = []
1003 1
                for schedule in value:
1004 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1005
1006
            # Get multiple attributes.
1007
            # Ex: primary_links,
1008
            #     backup_links,
1009
            #     current_links_cache,
1010
            #     primary_links_cache,
1011
            #     backup_links_cache
1012 1
            if "links" in attribute:
1013 1
                data[attribute] = [
1014
                    self._link_from_dict(link) for link in value
1015
                ]
1016
1017
            # Ex: current_path,
1018
            #     primary_path,
1019
            #     backup_path
1020 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1021 1
                data[attribute] = Path(
1022
                    [self._link_from_dict(link) for link in value]
1023
                )
1024
1025 1
        return data
1026
1027 1
    def _evc_from_dict(self, evc_dict):
1028 1
        data = self._evc_dict_with_instances(evc_dict)
1029 1
        data["table_group"] = self.table_group
1030 1
        return EVC(self.controller, **data)
1031
1032 1
    def _uni_from_dict(self, uni_dict):
1033
        """Return a UNI object from python dict."""
1034 1
        if uni_dict is None:
1035 1
            return False
1036
1037 1
        interface_id = uni_dict.get("interface_id")
1038 1
        interface = self.controller.get_interface_by_id(interface_id)
1039 1
        if interface is None:
1040 1
            result = (
1041
                "Error creating UNI:"
1042
                + f"Could not instantiate interface {interface_id}"
1043
            )
1044 1
            raise ValueError(result) from ValueError
1045 1
        tag_convert = {1: "vlan"}
1046 1
        tag_dict = uni_dict.get("tag", None)
1047 1
        if tag_dict:
1048 1
            tag_type = tag_dict.get("tag_type")
1049 1
            tag_type = tag_convert.get(tag_type, tag_type)
1050 1
            tag_value = tag_dict.get("value")
1051 1
            if isinstance(tag_value, list):
1052 1
                tag_value = get_tag_ranges(tag_value)
1053 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1054 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1055
            else:
1056 1
                tag = TAG(tag_type, tag_value)
1057
        else:
1058 1
            tag = None
1059 1
        uni = UNI(interface, tag)
1060 1
        return uni
1061
1062 1
    def _link_from_dict(self, link_dict):
1063
        """Return a Link object from python dict."""
1064 1
        id_a = link_dict.get("endpoint_a").get("id")
1065 1
        id_b = link_dict.get("endpoint_b").get("id")
1066
1067 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1068 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1069 1
        if not endpoint_a:
1070 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1071 1
            raise ValueError(error_msg)
1072 1
        if not endpoint_b:
1073
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1074
            raise ValueError(error_msg)
1075
1076 1
        link = Link(endpoint_a, endpoint_b)
1077 1
        if "metadata" in link_dict:
1078 1
            link.extend_metadata(link_dict.get("metadata"))
1079
1080 1
        s_vlan = link.get_metadata("s_vlan")
1081 1
        if s_vlan:
1082 1
            tag = TAG.from_dict(s_vlan)
1083 1
            if tag is False:
1084
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1085
                raise ValueError(error_msg)
1086 1
            link.update_metadata("s_vlan", tag)
1087 1
        return link
1088
1089 1
    def _find_evc_by_schedule_id(self, schedule_id):
1090
        """
1091
        Find an EVC and CircuitSchedule based on schedule_id.
1092
1093
        :param schedule_id: Schedule ID
1094
        :return: EVC and Schedule
1095
        """
1096 1
        circuits = self._get_circuits_buffer()
1097 1
        found_schedule = None
1098 1
        evc = None
1099
1100
        # pylint: disable=unused-variable
1101 1
        for c_id, circuit in circuits.items():
1102 1
            for schedule in circuit.circuit_scheduler:
1103 1
                if schedule.id == schedule_id:
1104 1
                    found_schedule = schedule
1105 1
                    evc = circuit
1106 1
                    break
1107 1
            if found_schedule:
1108 1
                break
1109 1
        return evc, found_schedule
1110
1111 1
    def _get_circuits_buffer(self):
1112
        """
1113
        Return the circuit buffer.
1114
1115
        If the buffer is empty, try to load data from mongodb.
1116
        """
1117 1
        if not self.circuits:
1118
            # Load circuits from mongodb to buffer
1119 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1120 1
            for c_id, circuit in circuits.items():
1121 1
                evc = self._evc_from_dict(circuit)
1122 1
                self.circuits[c_id] = evc
1123 1
        return self.circuits
1124
1125
    # pylint: disable=attribute-defined-outside-init
1126 1
    @alisten_to("kytos/of_multi_table.enable_table")
1127 1
    async def on_table_enabled(self, event):
1128
        """Handle a recently table enabled."""
1129 1
        table_group = event.content.get("mef_eline", None)
1130 1
        if not table_group:
1131 1
            return
1132 1
        for group in table_group:
1133 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1134 1
                log.error(f'The table group "{group}" is not allowed for '
1135
                          f'mef_eline. Allowed table groups are '
1136
                          f'{settings.TABLE_GROUP_ALLOWED}')
1137 1
                return
1138 1
        self.table_group.update(table_group)
1139 1
        content = {"group_table": self.table_group}
1140 1
        name = "kytos/mef_eline.enable_table"
1141
        await aemit_event(self.controller, name, content)
1142