Passed
Pull Request — master (#438)
by Italo Valcy
03:43
created

build.main.Main.handle_interface_link_up()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 7.608

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 2
dl 0
loc 9
ccs 1
cts 5
cp 0.2
crap 7.608
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        if enable_filter:
82 1
            return sorted(
83
                          [circuit for circuit in self.circuits.values()
84
                           if circuit.is_enabled()],
85
                          key=lambda x: (-x.service_level, x.creation_time),
86
            )
87 1
        return sorted(self.circuits.values(),
88
                      key=lambda x: (-x.service_level, x.creation_time))
89
90 1
    @staticmethod
91 1
    def get_eline_controller():
92
        """Return the ELineController instance."""
93
        return controllers.ELineController()
94
95 1
    def execute(self):
96
        """Execute once when the napp is running."""
97 1
        if self._lock.locked():
98 1
            return
99 1
        log.debug("Starting consistency routine")
100 1
        with self._lock:
101 1
            self.execute_consistency()
102 1
        log.debug("Finished consistency routine")
103
104 1
    def should_be_checked(self, circuit):
105
        "Verify if the circuit meets the necessary conditions to be checked"
106
        # pylint: disable=too-many-boolean-expressions
107 1
        if (
108
                circuit.is_enabled()
109
                and not circuit.is_active()
110
                and not circuit.lock.locked()
111
                and not circuit.has_recent_removed_flow()
112
                and not circuit.is_recent_updated()
113
                and circuit.are_unis_active(self.controller.switches)
114
                # if a inter-switch EVC does not have current_path, it does not
115
                # make sense to run sdntrace on it
116
                and (circuit.is_intra_switch() or circuit.current_path)
117
                ):
118 1
            return True
119
        return False
120
121 1
    def execute_consistency(self):
122
        """Execute consistency routine."""
123 1
        circuits_to_check = []
124 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
125 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
126 1
            stored_circuits.pop(circuit.id, None)
127 1
            if self.should_be_checked(circuit):
128 1
                circuits_to_check.append(circuit)
129 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
130 1
        for circuit in circuits_to_check:
131 1
            is_checked = circuits_checked.get(circuit.id)
132 1
            if is_checked:
133 1
                circuit.execution_rounds = 0
134 1
                log.info(f"{circuit} enabled but inactive - activating")
135 1
                with circuit.lock:
136 1
                    circuit.activate()
137 1
                    circuit.sync()
138
            else:
139 1
                circuit.execution_rounds += 1
140 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
141 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
142 1
                    with circuit.lock:
143 1
                        circuit.deploy()
144 1
        for circuit_id in stored_circuits:
145 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
146 1
            self._load_evc(stored_circuits[circuit_id])
147
148 1
    def shutdown(self):
149
        """Execute when your napp is unloaded.
150
151
        If you have some cleanup procedure, insert it here.
152
        """
153
154 1
    @rest("/v2/evc/", methods=["GET"])
155 1
    def list_circuits(self, request: Request) -> JSONResponse:
156
        """Endpoint to return circuits stored.
157
158
        archive query arg if defined (not null) will be filtered
159
        accordingly, by default only non archived evcs will be listed
160
        """
161 1
        log.debug("list_circuits /v2/evc")
162 1
        args = request.query_params
163 1
        archived = args.get("archived", "false").lower()
164 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
165 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
166
                                                      metadata=args)
167 1
        circuits = circuits['circuits']
168 1
        return JSONResponse(circuits)
169
170 1
    @rest("/v2/evc/schedule", methods=["GET"])
171 1
    def list_schedules(self, _request: Request) -> JSONResponse:
172
        """Endpoint to return all schedules stored for all circuits.
173
174
        Return a JSON with the following template:
175
        [{"schedule_id": <schedule_id>,
176
         "circuit_id": <circuit_id>,
177
         "schedule": <schedule object>}]
178
        """
179 1
        log.debug("list_schedules /v2/evc/schedule")
180 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
181 1
        if not circuits:
182 1
            result = {}
183 1
            status = 200
184 1
            return JSONResponse(result, status_code=status)
185
186 1
        result = []
187 1
        status = 200
188 1
        for circuit in circuits:
189 1
            circuit_scheduler = circuit.get("circuit_scheduler")
190 1
            if circuit_scheduler:
191 1
                for scheduler in circuit_scheduler:
192 1
                    value = {
193
                        "schedule_id": scheduler.get("id"),
194
                        "circuit_id": circuit.get("id"),
195
                        "schedule": scheduler,
196
                    }
197 1
                    result.append(value)
198
199 1
        log.debug("list_schedules result %s %s", result, status)
200 1
        return JSONResponse(result, status_code=status)
201
202 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
203 1
    def get_circuit(self, request: Request) -> JSONResponse:
204
        """Endpoint to return a circuit based on id."""
205 1
        circuit_id = request.path_params["circuit_id"]
206 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
207 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
208 1
        if not circuit:
209 1
            result = f"circuit_id {circuit_id} not found"
210 1
            log.debug("get_circuit result %s %s", result, 404)
211 1
            raise HTTPException(404, detail=result)
212 1
        status = 200
213 1
        log.debug("get_circuit result %s %s", circuit, status)
214 1
        return JSONResponse(circuit, status_code=status)
215
216
    # pylint: disable=too-many-branches, too-many-statements
217 1
    @rest("/v2/evc/", methods=["POST"])
218 1
    @validate_openapi(spec)
219 1
    def create_circuit(self, request: Request) -> JSONResponse:
220
        """Try to create a new circuit.
221
222
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
223
        UNI_Z's requested C-VID are available from the interfaces' pools. This
224
        is checked when creating the UNI object.
225
226
        Then, E-Line NApp requests a primary and a backup path to the
227
        Pathfinder NApp using the attributes primary_links and backup_links
228
        submitted via REST
229
230
        # For each link composing paths in #3:
231
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
232
        #  - Using the S-VID obtained, generate abstract flow entries to be
233
        #    sent to FlowManager
234
235
        Push abstract flow entries to FlowManager and FlowManager pushes
236
        OpenFlow entries to datapaths
237
238
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
239
        creation
240
241
        Finnaly, notify user of the status of its request.
242
        """
243
        # Try to create the circuit object
244 1
        log.debug("create_circuit /v2/evc/")
245 1
        data = get_json_or_400(request, self.controller.loop)
246
247 1
        try:
248 1
            evc = self._evc_from_dict(data)
249 1
        except (ValueError, KytosTagError) as exception:
250 1
            log.debug("create_circuit result %s %s", exception, 400)
251 1
            raise HTTPException(400, detail=str(exception)) from exception
252 1
        try:
253 1
            check_disabled_component(evc.uni_a, evc.uni_z)
254 1
        except DisabledSwitch as exception:
255 1
            log.debug("create_circuit result %s %s", exception, 409)
256 1
            raise HTTPException(
257
                    409,
258
                    detail=f"Path is not valid: {exception}"
259
                ) from exception
260
261 1
        if evc.primary_path:
262 1
            try:
263 1
                evc.primary_path.is_valid(
264
                    evc.uni_a.interface.switch,
265
                    evc.uni_z.interface.switch,
266
                    bool(evc.circuit_scheduler),
267
                )
268 1
            except InvalidPath as exception:
269 1
                raise HTTPException(
270
                    400,
271
                    detail=f"primary_path is not valid: {exception}"
272
                ) from exception
273 1
        if evc.backup_path:
274 1
            try:
275 1
                evc.backup_path.is_valid(
276
                    evc.uni_a.interface.switch,
277
                    evc.uni_z.interface.switch,
278
                    bool(evc.circuit_scheduler),
279
                )
280 1
            except InvalidPath as exception:
281 1
                raise HTTPException(
282
                    400,
283
                    detail=f"backup_path is not valid: {exception}"
284
                ) from exception
285
286 1
        if not evc._tag_lists_equal():
287 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
288 1
            raise HTTPException(400, detail=detail)
289
290 1
        try:
291 1
            evc._validate_has_primary_or_dynamic()
292 1
        except ValueError as exception:
293 1
            raise HTTPException(400, detail=str(exception)) from exception
294
295 1
        try:
296 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
297
        except DuplicatedNoTagUNI as exception:
298
            log.debug("create_circuit result %s %s", exception, 409)
299
            raise HTTPException(409, detail=str(exception)) from exception
300
301 1
        try:
302 1
            self._use_uni_tags(evc)
303 1
        except KytosTagError as exception:
304 1
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # save circuit
307 1
        try:
308 1
            evc.sync()
309
        except ValidationError as exception:
310
            raise HTTPException(400, detail=str(exception)) from exception
311
312
        # store circuit in dictionary
313 1
        self.circuits[evc.id] = evc
314
315
        # Schedule the circuit deploy
316 1
        self.sched.add(evc)
317
318
        # Circuit has no schedule, deploy now
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    evc.deploy()
409 1
        result = {evc.id: evc.as_dict()}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432
433 1
        log.info("Removing %s", evc)
434 1
        with evc.lock:
435 1
            evc.remove_current_flows()
436 1
            evc.remove_failover_flows(sync=False)
437 1
            evc.deactivate()
438 1
            evc.disable()
439 1
            self.sched.remove(evc)
440 1
            evc.archive()
441 1
            evc.remove_uni_tags()
442 1
            evc.sync()
443 1
        log.info("EVC removed. %s", evc)
444 1
        result = {"response": f"Circuit {circuit_id} removed"}
445 1
        status = 200
446
447 1
        log.debug("delete_circuit result %s %s", result, status)
448 1
        emit_event(self.controller, "deleted",
449
                   content=map_evc_event_content(evc))
450 1
        return JSONResponse(result, status_code=status)
451
452 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
453 1
    def get_metadata(self, request: Request) -> JSONResponse:
454
        """Get metadata from an EVC."""
455 1
        circuit_id = request.path_params["circuit_id"]
456 1
        try:
457 1
            return (
458
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
459
            )
460
        except KeyError as error:
461
            raise HTTPException(
462
                404,
463
                detail=f"circuit_id {circuit_id} not found."
464
            ) from error
465
466 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
467 1
    @validate_openapi(spec)
468 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
469
        """Add metadata to a bulk of EVCs."""
470 1
        data = get_json_or_400(request, self.controller.loop)
471 1
        circuit_ids = data.pop("circuit_ids")
472
473 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
474
475 1
        fail_evcs = []
476 1
        for _id in circuit_ids:
477 1
            try:
478 1
                evc = self.circuits[_id]
479 1
                evc.extend_metadata(data)
480 1
            except KeyError:
481 1
                fail_evcs.append(_id)
482
483 1
        if fail_evcs:
484 1
            raise HTTPException(404, detail=fail_evcs)
485 1
        return JSONResponse("Operation successful", status_code=201)
486
487 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
488 1
    @validate_openapi(spec)
489 1
    def add_metadata(self, request: Request) -> JSONResponse:
490
        """Add metadata to an EVC."""
491 1
        circuit_id = request.path_params["circuit_id"]
492 1
        metadata = get_json_or_400(request, self.controller.loop)
493 1
        if not isinstance(metadata, dict):
494
            raise HTTPException(400, "Invalid metadata value: {metadata}")
495 1
        try:
496 1
            evc = self.circuits[circuit_id]
497 1
        except KeyError as error:
498 1
            raise HTTPException(
499
                404,
500
                detail=f"circuit_id {circuit_id} not found."
501
            ) from error
502
503 1
        evc.extend_metadata(metadata)
504 1
        evc.sync()
505 1
        return JSONResponse("Operation successful", status_code=201)
506
507 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508 1
    @validate_openapi(spec)
509 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
510
        """Delete metada from a bulk of EVCs"""
511 1
        data = get_json_or_400(request, self.controller.loop)
512 1
        key = request.path_params["key"]
513 1
        circuit_ids = data.pop("circuit_ids")
514 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
515
516 1
        fail_evcs = []
517 1
        for _id in circuit_ids:
518 1
            try:
519 1
                evc = self.circuits[_id]
520 1
                evc.remove_metadata(key)
521 1
            except KeyError:
522 1
                fail_evcs.append(_id)
523
524 1
        if fail_evcs:
525 1
            raise HTTPException(404, detail=fail_evcs)
526 1
        return JSONResponse("Operation successful")
527
528 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
529 1
    def delete_metadata(self, request: Request) -> JSONResponse:
530
        """Delete metadata from an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532 1
        key = request.path_params["key"]
533 1
        try:
534 1
            evc = self.circuits[circuit_id]
535 1
        except KeyError as error:
536 1
            raise HTTPException(
537
                404,
538
                detail=f"circuit_id {circuit_id} not found."
539
            ) from error
540
541 1
        evc.remove_metadata(key)
542 1
        evc.sync()
543 1
        return JSONResponse("Operation successful")
544
545 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
546 1
    def redeploy(self, request: Request) -> JSONResponse:
547
        """Endpoint to force the redeployment of an EVC."""
548 1
        circuit_id = request.path_params["circuit_id"]
549 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
550 1
        try:
551 1
            evc = self.circuits[circuit_id]
552 1
        except KeyError:
553 1
            raise HTTPException(
554
                404,
555
                detail=f"circuit_id {circuit_id} not found"
556
            ) from KeyError
557 1
        if evc.is_enabled():
558 1
            with evc.lock:
559 1
                evc.remove_current_flows()
560 1
                evc.deploy()
561 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
562 1
            status = 202
563
        else:
564 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
565 1
            status = 409
566
567 1
        return JSONResponse(result, status_code=status)
568
569 1
    @rest("/v2/evc/schedule/", methods=["POST"])
570 1
    @validate_openapi(spec)
571 1
    def create_schedule(self, request: Request) -> JSONResponse:
572
        """
573
        Create a new schedule for a given circuit.
574
575
        This service do no check if there are conflicts with another schedule.
576
        Payload example:
577
            {
578
              "circuit_id":"aa:bb:cc",
579
              "schedule": {
580
                "date": "2019-08-07T14:52:10.967Z",
581
                "interval": "string",
582
                "frequency": "1 * * * *",
583
                "action": "create"
584
              }
585
            }
586
        """
587 1
        log.debug("create_schedule /v2/evc/schedule/")
588 1
        data = get_json_or_400(request, self.controller.loop)
589 1
        circuit_id = data["circuit_id"]
590 1
        schedule_data = data["schedule"]
591
592
        # Get EVC from circuits buffer
593 1
        circuits = self._get_circuits_buffer()
594
595
        # get the circuit
596 1
        evc = circuits.get(circuit_id)
597
598
        # get the circuit
599 1
        if not evc:
600 1
            result = f"circuit_id {circuit_id} not found"
601 1
            log.debug("create_schedule result %s %s", result, 404)
602 1
            raise HTTPException(404, detail=result)
603
604
        # new schedule from dict
605 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
606
607
        # If there is no schedule, create the list
608 1
        if not evc.circuit_scheduler:
609 1
            evc.circuit_scheduler = []
610
611
        # Add the new schedule
612 1
        evc.circuit_scheduler.append(new_schedule)
613
614
        # Add schedule job
615 1
        self.sched.add_circuit_job(evc, new_schedule)
616
617
        # save circuit to mongodb
618 1
        evc.sync()
619
620 1
        result = new_schedule.as_dict()
621 1
        status = 201
622
623 1
        log.debug("create_schedule result %s %s", result, status)
624 1
        return JSONResponse(result, status_code=status)
625
626 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
627 1
    @validate_openapi(spec)
628 1
    def update_schedule(self, request: Request) -> JSONResponse:
629
        """Update a schedule.
630
631
        Change all attributes from the given schedule from a EVC circuit.
632
        The schedule ID is preserved as default.
633
        Payload example:
634
            {
635
              "date": "2019-08-07T14:52:10.967Z",
636
              "interval": "string",
637
              "frequency": "1 * * *",
638
              "action": "create"
639
            }
640
        """
641 1
        data = get_json_or_400(request, self.controller.loop)
642 1
        schedule_id = request.path_params["schedule_id"]
643 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
644
645
        # Try to find a circuit schedule
646 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
647
648
        # Can not modify circuits deleted and archived
649 1
        if not found_schedule:
650 1
            result = f"schedule_id {schedule_id} not found"
651 1
            log.debug("update_schedule result %s %s", result, 404)
652 1
            raise HTTPException(404, detail=result)
653
654 1
        new_schedule = CircuitSchedule.from_dict(data)
655 1
        new_schedule.id = found_schedule.id
656
        # Remove the old schedule
657 1
        evc.circuit_scheduler.remove(found_schedule)
658
        # Append the modified schedule
659 1
        evc.circuit_scheduler.append(new_schedule)
660
661
        # Cancel all schedule jobs
662 1
        self.sched.cancel_job(found_schedule.id)
663
        # Add the new circuit schedule
664 1
        self.sched.add_circuit_job(evc, new_schedule)
665
        # Save EVC to mongodb
666 1
        evc.sync()
667
668 1
        result = new_schedule.as_dict()
669 1
        status = 200
670
671 1
        log.debug("update_schedule result %s %s", result, status)
672 1
        return JSONResponse(result, status_code=status)
673
674 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
675 1
    def delete_schedule(self, request: Request) -> JSONResponse:
676
        """Remove a circuit schedule.
677
678
        Remove the Schedule from EVC.
679
        Remove the Schedule from cron job.
680
        Save the EVC to the Storehouse.
681
        """
682 1
        schedule_id = request.path_params["schedule_id"]
683 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
684 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
685
686
        # Can not modify circuits deleted and archived
687 1
        if not found_schedule:
688 1
            result = f"schedule_id {schedule_id} not found"
689 1
            log.debug("delete_schedule result %s %s", result, 404)
690 1
            raise HTTPException(404, detail=result)
691
692
        # Remove the old schedule
693 1
        evc.circuit_scheduler.remove(found_schedule)
694
695
        # Cancel all schedule jobs
696 1
        self.sched.cancel_job(found_schedule.id)
697
        # Save EVC to mongodb
698 1
        evc.sync()
699
700 1
        result = "Schedule removed"
701 1
        status = 200
702
703 1
        log.debug("delete_schedule result %s %s", result, status)
704 1
        return JSONResponse(result, status_code=status)
705
706 1
    def _check_no_tag_duplication(
707
        self,
708
        evc_id: str,
709
        uni_a: Optional[UNI] = None,
710
        uni_z: Optional[UNI] = None
711
    ):
712
        """Check if the given EVC has UNIs with no tag and if these are
713
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
714
        Args:
715
            evc (dict): EVC to be analyzed.
716
        """
717
718
        # No UNIs
719 1
        if not (uni_a or uni_z):
720 1
            return
721
722 1
        if (not (uni_a and not uni_a.user_tag) and
723
                not (uni_z and not uni_z.user_tag)):
724 1
            return
725 1
        for circuit in self.circuits.copy().values():
726 1
            if (not circuit.archived and circuit._id != evc_id):
727 1
                if uni_a and uni_a.user_tag is None:
728 1
                    circuit.check_no_tag_duplicate(uni_a)
729 1
                if uni_z and uni_z.user_tag is None:
730 1
                    circuit.check_no_tag_duplicate(uni_z)
731
732 1
    @listen_to("kytos/topology.link_up")
733 1
    def on_link_up(self, event):
734
        """Change circuit when link is up or end_maintenance."""
735
        self.handle_link_up(event)
736
737 1
    def handle_link_up(self, event):
738
        """Change circuit when link is up or end_maintenance."""
739 1
        log.info("Event handle_link_up %s", event.content["link"])
740 1
        for evc in self.get_evcs_by_svc_level():
741 1
            if evc.is_enabled() and not evc.archived:
742 1
                with evc.lock:
743 1
                    evc.handle_link_up(event.content["link"])
744
745
    # Possibly replace this with interruptions?
746 1
    @listen_to(
747
        '.*.switch.interface.(link_up|link_down|created|deleted)'
748
    )
749 1
    def on_interface_link_change(self, event: KytosEvent):
750
        """
751
        Handler for interface link_up and link_down events.
752
        """
753
        self.handle_on_interface_link_change(event)
754
755 1
    def handle_on_interface_link_change(self, event: KytosEvent):
756
        """
757
        Handler to sort interface events {link_(up, down), create, deleted}
758
759
        To avoid multiple database updated (link flap):
760
        Every interface is identfied and processed in parallel.
761
        Once an interface event is received a time is started.
762
        While time is running self._intf_events will be updated.
763
        After time has passed last received event will be processed.
764
        """
765 1
        iface = event.content.get("interface")
766 1
        with self._lock_interfaces[iface.id]:
767 1
            _now = event.timestamp
768
            # Return out of order events
769 1
            if (
770
                iface.id in self._intf_events
771
                and self._intf_events[iface.id]["event"].timestamp > _now
772
            ):
773 1
                return
774 1
            self._intf_events[iface.id].update({"event": event})
775 1
            if "last_acquired" in self._intf_events[iface.id]:
776 1
                return
777 1
            self._intf_events[iface.id].update({"last_acquired": now()})
778 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
779 1
        with self._lock_interfaces[iface.id]:
780 1
            event = self._intf_events[iface.id]["event"]
781 1
            self._intf_events[iface.id].pop('last_acquired', None)
782 1
            _, _, event_type = event.name.rpartition('.')
783 1
            if event_type in ('link_up', 'created'):
784 1
                self.handle_interface_link_up(iface)
785 1
            elif event_type in ('link_down', 'deleted'):
786 1
                self.handle_interface_link_down(iface)
787
788 1
    def handle_interface_link_up(self, interface):
789
        """
790
        Handler for interface link_up events
791
        """
792
        for evc in self.get_evcs_by_svc_level():
793
            with evc.lock:
794
                log.info("Event handle_interface_link_up %s", interface)
795
                evc.handle_interface_link_up(
796
                    interface
797
                )
798
799 1
    def handle_interface_link_down(self, interface):
800
        """
801
        Handler for interface link_down events
802
        """
803
        log.info("Event handle_interface_link_down %s", interface)
804
        for evc in self.get_evcs_by_svc_level():
805
            with evc.lock:
806
                evc.handle_interface_link_down(
807
                    interface
808
                )
809
810 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
811 1
    def on_link_down(self, event):
812
        """Change circuit when link is down or under_mantenance."""
813
        self.handle_link_down(event)
814
815 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
816
        """Change circuit when link is down or under_mantenance."""
817 1
        link = event.content["link"]
818 1
        log.info("Event handle_link_down %s", link)
819 1
        switch_flows = {}
820 1
        evcs_with_failover = []
821 1
        evcs_normal = []
822 1
        check_failover = []
823 1
        for evc in self.get_evcs_by_svc_level():
824 1
            with evc.lock:
825 1
                if evc.is_affected_by_link(link):
826
                    # if there is no failover path, handles link down the
827
                    # tradditional way
828 1
                    if (
829
                        not getattr(evc, 'failover_path', None) or
830
                        evc.is_failover_path_affected_by_link(link)
831
                    ):
832 1
                        evcs_normal.append(evc)
833 1
                        continue
834 1
                    try:
835 1
                        dpid_flows = evc.get_failover_flows()
836 1
                        evc.old_path = evc.current_path
837 1
                        evc.current_path = evc.failover_path
838 1
                        evc.failover_path = Path([])
839 1
                        evc.sync()
840
                    # pylint: disable=broad-except
841 1
                    except Exception:
842 1
                        err = traceback.format_exc().replace("\n", ", ")
843 1
                        log.error(
844
                            "Ignore Failover path for "
845
                            f"{evc} due to error: {err}"
846
                        )
847 1
                        evcs_normal.append(evc)
848 1
                        continue
849 1
                    for dpid, flows in dpid_flows.items():
850 1
                        switch_flows.setdefault(dpid, [])
851 1
                        switch_flows[dpid].extend(flows)
852 1
                    evcs_with_failover.append(evc)
853 1
                elif evc.is_failover_path_affected_by_link(link):
854 1
                    check_failover.append(evc)
855
856 1
        while switch_flows:
857 1
            offset = settings.BATCH_SIZE or None
858 1
            switches = list(switch_flows.keys())
859 1
            for dpid in switches:
860 1
                emit_event(
861
                    self.controller,
862
                    context="kytos.flow_manager",
863
                    name="flows.install",
864
                    content={
865
                        "dpid": dpid,
866
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
867
                    }
868
                )
869 1
                if offset is None or offset >= len(switch_flows[dpid]):
870 1
                    del switch_flows[dpid]
871 1
                    continue
872 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
873 1
            time.sleep(settings.BATCH_INTERVAL)
874
875 1
        for evc in evcs_normal:
876 1
            emit_event(
877
                self.controller,
878
                "evc_affected_by_link_down",
879
                content={"link": link} | map_evc_event_content(evc)
880
            )
881
882 1
        for evc in evcs_with_failover:
883 1
            emit_event(
884
                self.controller,
885
                "redeployed_link_down",
886
                content=map_evc_event_content(evc,  old_path=evc.old_path)
887
            )
888 1
            log.info(
889
                f"{evc} redeployed with failover due to link down {link.id}"
890
            )
891
892
        # After handling the hot path, check if new failover paths are needed.
893
        # Note that EVCs affected by link down will generate a KytosEvent for
894
        # deployed|redeployed, which will trigger the failover path setup.
895
        # Thus, we just need to further check the check_failover list
896 1
        for evc in check_failover:
897 1
            if evc.is_failover_path_affected_by_link(link):
898 1
                with evc.lock:
899 1
                    evc.setup_failover_path()
900
901 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
902 1
    def on_evc_affected_by_link_down(self, event):
903
        """Change circuit when link is down or under_mantenance."""
904
        self.handle_evc_affected_by_link_down(event)
905
906 1
    def handle_evc_affected_by_link_down(self, event):
907
        """Change circuit when link is down or under_mantenance."""
908 1
        evc = self.circuits.get(event.content["evc_id"])
909 1
        link = event.content['link']
910 1
        if not evc:
911 1
            return
912 1
        with evc.lock:
913 1
            if not evc.is_affected_by_link(link):
914
                return
915 1
            result = evc.handle_link_down()
916 1
        event_name = "error_redeploy_link_down"
917 1
        if result:
918 1
            log.info(f"{evc} redeployed due to link down {link.id}")
919 1
            event_name = "redeployed_link_down"
920 1
        emit_event(self.controller, event_name,
921
                   content=map_evc_event_content(evc))
922
923 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
924 1
    def on_evc_deployed(self, event):
925
        """Handle EVC deployed|redeployed_link_down."""
926
        self.handle_evc_deployed(event)
927
928 1
    def handle_evc_deployed(self, event):
929
        """Setup failover path on evc deployed."""
930 1
        evc = self.circuits.get(event.content["evc_id"])
931 1
        old_path = event.content.get("old_path")
932 1
        if not evc:
933 1
            return
934 1
        with evc.lock:
935 1
            evc.setup_failover_path(old_path)
936
937 1
    @listen_to("kytos/topology.topology_loaded")
938 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
939
        """Load EVCs once the topology is available."""
940
        self.load_all_evcs()
941
942 1
    def load_all_evcs(self):
943
        """Try to load all EVCs on startup."""
944 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
945 1
        for circuit_id, circuit in circuits:
946 1
            if circuit_id not in self.circuits:
947 1
                self._load_evc(circuit)
948 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
949
                   timeout=1)
950
951 1
    def _load_evc(self, circuit_dict):
952
        """Load one EVC from mongodb to memory."""
953 1
        try:
954 1
            evc = self._evc_from_dict(circuit_dict)
955 1
        except (ValueError, KytosTagError) as exception:
956 1
            log.error(
957
                f"Could not load EVC: dict={circuit_dict} error={exception}"
958
            )
959 1
            return None
960 1
        if evc.archived:
961 1
            return None
962
963 1
        self.circuits.setdefault(evc.id, evc)
964 1
        self.sched.add(evc)
965 1
        return evc
966
967 1
    @listen_to("kytos/flow_manager.flow.error")
968 1
    def on_flow_mod_error(self, event):
969
        """Handle flow mod errors related to an EVC."""
970
        self.handle_flow_mod_error(event)
971
972 1
    def handle_flow_mod_error(self, event):
973
        """Handle flow mod errors related to an EVC."""
974 1
        flow = event.content["flow"]
975 1
        command = event.content.get("error_command")
976 1
        if command != "add":
977
            return
978 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
979 1
        if evc:
980 1
            with evc.lock:
981 1
                evc.remove_current_flows()
982
983 1
    def _evc_dict_with_instances(self, evc_dict):
984
        """Convert some dict values to instance of EVC classes.
985
986
        This method will convert: [UNI, Link]
987
        """
988 1
        data = evc_dict.copy()  # Do not modify the original dict
989 1
        for attribute, value in data.items():
990
            # Get multiple attributes.
991
            # Ex: uni_a, uni_z
992 1
            if "uni" in attribute:
993 1
                try:
994 1
                    data[attribute] = self._uni_from_dict(value)
995 1
                except ValueError as exception:
996 1
                    result = "Error creating UNI: Invalid value"
997 1
                    raise ValueError(result) from exception
998
999 1
            if attribute == "circuit_scheduler":
1000 1
                data[attribute] = []
1001 1
                for schedule in value:
1002 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1003
1004
            # Get multiple attributes.
1005
            # Ex: primary_links,
1006
            #     backup_links,
1007
            #     current_links_cache,
1008
            #     primary_links_cache,
1009
            #     backup_links_cache
1010 1
            if "links" in attribute:
1011 1
                data[attribute] = [
1012
                    self._link_from_dict(link) for link in value
1013
                ]
1014
1015
            # Ex: current_path,
1016
            #     primary_path,
1017
            #     backup_path
1018 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1019 1
                data[attribute] = Path(
1020
                    [self._link_from_dict(link) for link in value]
1021
                )
1022
1023 1
        return data
1024
1025 1
    def _evc_from_dict(self, evc_dict):
1026 1
        data = self._evc_dict_with_instances(evc_dict)
1027 1
        data["table_group"] = self.table_group
1028 1
        return EVC(self.controller, **data)
1029
1030 1
    def _uni_from_dict(self, uni_dict):
1031
        """Return a UNI object from python dict."""
1032 1
        if uni_dict is None:
1033 1
            return False
1034
1035 1
        interface_id = uni_dict.get("interface_id")
1036 1
        interface = self.controller.get_interface_by_id(interface_id)
1037 1
        if interface is None:
1038 1
            result = (
1039
                "Error creating UNI:"
1040
                + f"Could not instantiate interface {interface_id}"
1041
            )
1042 1
            raise ValueError(result) from ValueError
1043 1
        tag_convert = {1: "vlan"}
1044 1
        tag_dict = uni_dict.get("tag", None)
1045 1
        if tag_dict:
1046 1
            tag_type = tag_dict.get("tag_type")
1047 1
            tag_type = tag_convert.get(tag_type, tag_type)
1048 1
            tag_value = tag_dict.get("value")
1049 1
            if isinstance(tag_value, list):
1050 1
                tag_value = get_tag_ranges(tag_value)
1051 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1052 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1053
            else:
1054 1
                tag = TAG(tag_type, tag_value)
1055
        else:
1056 1
            tag = None
1057 1
        uni = UNI(interface, tag)
1058 1
        return uni
1059
1060 1
    def _link_from_dict(self, link_dict):
1061
        """Return a Link object from python dict."""
1062 1
        id_a = link_dict.get("endpoint_a").get("id")
1063 1
        id_b = link_dict.get("endpoint_b").get("id")
1064
1065 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1066 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1067 1
        if not endpoint_a:
1068 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1069 1
            raise ValueError(error_msg)
1070 1
        if not endpoint_b:
1071
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1072
            raise ValueError(error_msg)
1073
1074 1
        link = Link(endpoint_a, endpoint_b)
1075 1
        if "metadata" in link_dict:
1076 1
            link.extend_metadata(link_dict.get("metadata"))
1077
1078 1
        s_vlan = link.get_metadata("s_vlan")
1079 1
        if s_vlan:
1080 1
            tag = TAG.from_dict(s_vlan)
1081 1
            if tag is False:
1082
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1083
                raise ValueError(error_msg)
1084 1
            link.update_metadata("s_vlan", tag)
1085 1
        return link
1086
1087 1
    def _find_evc_by_schedule_id(self, schedule_id):
1088
        """
1089
        Find an EVC and CircuitSchedule based on schedule_id.
1090
1091
        :param schedule_id: Schedule ID
1092
        :return: EVC and Schedule
1093
        """
1094 1
        circuits = self._get_circuits_buffer()
1095 1
        found_schedule = None
1096 1
        evc = None
1097
1098
        # pylint: disable=unused-variable
1099 1
        for c_id, circuit in circuits.items():
1100 1
            for schedule in circuit.circuit_scheduler:
1101 1
                if schedule.id == schedule_id:
1102 1
                    found_schedule = schedule
1103 1
                    evc = circuit
1104 1
                    break
1105 1
            if found_schedule:
1106 1
                break
1107 1
        return evc, found_schedule
1108
1109 1
    def _get_circuits_buffer(self):
1110
        """
1111
        Return the circuit buffer.
1112
1113
        If the buffer is empty, try to load data from mongodb.
1114
        """
1115 1
        if not self.circuits:
1116
            # Load circuits from mongodb to buffer
1117 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1118 1
            for c_id, circuit in circuits.items():
1119 1
                evc = self._evc_from_dict(circuit)
1120 1
                self.circuits[c_id] = evc
1121 1
        return self.circuits
1122
1123
    # pylint: disable=attribute-defined-outside-init
1124 1
    @alisten_to("kytos/of_multi_table.enable_table")
1125 1
    async def on_table_enabled(self, event):
1126
        """Handle a recently table enabled."""
1127 1
        table_group = event.content.get("mef_eline", None)
1128 1
        if not table_group:
1129 1
            return
1130 1
        for group in table_group:
1131 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1132 1
                log.error(f'The table group "{group}" is not allowed for '
1133
                          f'mef_eline. Allowed table groups are '
1134
                          f'{settings.TABLE_GROUP_ALLOWED}')
1135 1
                return
1136 1
        self.table_group.update(table_group)
1137 1
        content = {"group_table": self.table_group}
1138 1
        name = "kytos/mef_eline.enable_table"
1139
        await aemit_event(self.controller, name, content)
1140