Passed
Pull Request — master (#433)
by
unknown
03:46
created

build.main.Main.on_evc_deployed()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.037
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        return sorted(self.circuits.values(),
82
                      key=lambda x: (-x.service_level, x.creation_time))
83
84 1
    @staticmethod
85 1
    def get_eline_controller():
86
        """Return the ELineController instance."""
87
        return controllers.ELineController()
88
89 1
    def execute(self):
90
        """Execute once when the napp is running."""
91 1
        if self._lock.locked():
92 1
            return
93 1
        log.debug("Starting consistency routine")
94 1
        with self._lock:
95 1
            self.execute_consistency()
96 1
        log.debug("Finished consistency routine")
97
98 1
    def should_be_checked(self, circuit):
99
        "Verify if the circuit meets the necessary conditions to be checked"
100
        # pylint: disable=too-many-boolean-expressions
101 1
        if (
102
                circuit.is_enabled()
103
                and not circuit.is_active()
104
                and not circuit.lock.locked()
105
                and not circuit.has_recent_removed_flow()
106
                and not circuit.is_recent_updated()
107
                and circuit.are_unis_active(self.controller.switches)
108
                # if a inter-switch EVC does not have current_path, it does not
109
                # make sense to run sdntrace on it
110
                and (circuit.is_intra_switch() or circuit.current_path)
111
                ):
112 1
            return True
113
        return False
114
115 1
    def execute_consistency(self):
116
        """Execute consistency routine."""
117 1
        circuits_to_check = []
118 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
119 1
        for circuit in self.get_evcs_by_svc_level():
120 1
            stored_circuits.pop(circuit.id, None)
121 1
            if self.should_be_checked(circuit):
122 1
                circuits_to_check.append(circuit)
123 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
124 1
        for circuit in circuits_to_check:
125 1
            is_checked = circuits_checked.get(circuit.id)
126 1
            if is_checked:
127 1
                circuit.execution_rounds = 0
128 1
                log.info(f"{circuit} enabled but inactive - activating")
129 1
                with circuit.lock:
130 1
                    circuit.activate()
131 1
                    circuit.sync()
132
            else:
133 1
                circuit.execution_rounds += 1
134 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
135 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
136 1
                    with circuit.lock:
137 1
                        circuit.deploy()
138 1
        for circuit_id in stored_circuits:
139 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
140 1
            self._load_evc(stored_circuits[circuit_id])
141
142 1
    def shutdown(self):
143
        """Execute when your napp is unloaded.
144
145
        If you have some cleanup procedure, insert it here.
146
        """
147
148 1
    @rest("/v2/evc/", methods=["GET"])
149 1
    def list_circuits(self, request: Request) -> JSONResponse:
150
        """Endpoint to return circuits stored.
151
152
        archive query arg if defined (not null) will be filtered
153
        accordingly, by default only non archived evcs will be listed
154
        """
155 1
        log.debug("list_circuits /v2/evc")
156 1
        args = request.query_params
157 1
        archived = args.get("archived", "false").lower()
158 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
159 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
160
                                                      metadata=args)
161 1
        circuits = circuits['circuits']
162 1
        return JSONResponse(circuits)
163
164 1
    @rest("/v2/evc/schedule", methods=["GET"])
165 1
    def list_schedules(self, _request: Request) -> JSONResponse:
166
        """Endpoint to return all schedules stored for all circuits.
167
168
        Return a JSON with the following template:
169
        [{"schedule_id": <schedule_id>,
170
         "circuit_id": <circuit_id>,
171
         "schedule": <schedule object>}]
172
        """
173 1
        log.debug("list_schedules /v2/evc/schedule")
174 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
175 1
        if not circuits:
176 1
            result = {}
177 1
            status = 200
178 1
            return JSONResponse(result, status_code=status)
179
180 1
        result = []
181 1
        status = 200
182 1
        for circuit in circuits:
183 1
            circuit_scheduler = circuit.get("circuit_scheduler")
184 1
            if circuit_scheduler:
185 1
                for scheduler in circuit_scheduler:
186 1
                    value = {
187
                        "schedule_id": scheduler.get("id"),
188
                        "circuit_id": circuit.get("id"),
189
                        "schedule": scheduler,
190
                    }
191 1
                    result.append(value)
192
193 1
        log.debug("list_schedules result %s %s", result, status)
194 1
        return JSONResponse(result, status_code=status)
195
196 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
197 1
    def get_circuit(self, request: Request) -> JSONResponse:
198
        """Endpoint to return a circuit based on id."""
199 1
        circuit_id = request.path_params["circuit_id"]
200 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
201 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
202 1
        if not circuit:
203 1
            result = f"circuit_id {circuit_id} not found"
204 1
            log.debug("get_circuit result %s %s", result, 404)
205 1
            raise HTTPException(404, detail=result)
206 1
        status = 200
207 1
        log.debug("get_circuit result %s %s", circuit, status)
208 1
        return JSONResponse(circuit, status_code=status)
209
210
    # pylint: disable=too-many-branches, too-many-statements
211 1
    @rest("/v2/evc/", methods=["POST"])
212 1
    @validate_openapi(spec)
213 1
    def create_circuit(self, request: Request) -> JSONResponse:
214
        """Try to create a new circuit.
215
216
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
217
        UNI_Z's requested C-VID are available from the interfaces' pools. This
218
        is checked when creating the UNI object.
219
220
        Then, E-Line NApp requests a primary and a backup path to the
221
        Pathfinder NApp using the attributes primary_links and backup_links
222
        submitted via REST
223
224
        # For each link composing paths in #3:
225
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
226
        #  - Using the S-VID obtained, generate abstract flow entries to be
227
        #    sent to FlowManager
228
229
        Push abstract flow entries to FlowManager and FlowManager pushes
230
        OpenFlow entries to datapaths
231
232
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
233
        creation
234
235
        Finnaly, notify user of the status of its request.
236
        """
237
        # Try to create the circuit object
238 1
        log.debug("create_circuit /v2/evc/")
239 1
        data = get_json_or_400(request, self.controller.loop)
240
241 1
        try:
242 1
            evc = self._evc_from_dict(data)
243 1
        except (ValueError, KytosTagError) as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 400)
245 1
            raise HTTPException(400, detail=str(exception)) from exception
246 1
        try:
247 1
            check_disabled_component(evc.uni_a, evc.uni_z)
248 1
        except DisabledSwitch as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 409)
250 1
            raise HTTPException(
251
                    409,
252
                    detail=f"Path is not valid: {exception}"
253
                ) from exception
254
255 1
        if evc.primary_path:
256 1
            try:
257 1
                evc.primary_path.is_valid(
258
                    evc.uni_a.interface.switch,
259
                    evc.uni_z.interface.switch,
260
                    bool(evc.circuit_scheduler),
261
                )
262 1
            except InvalidPath as exception:
263 1
                raise HTTPException(
264
                    400,
265
                    detail=f"primary_path is not valid: {exception}"
266
                ) from exception
267 1
        if evc.backup_path:
268 1
            try:
269 1
                evc.backup_path.is_valid(
270
                    evc.uni_a.interface.switch,
271
                    evc.uni_z.interface.switch,
272
                    bool(evc.circuit_scheduler),
273
                )
274 1
            except InvalidPath as exception:
275 1
                raise HTTPException(
276
                    400,
277
                    detail=f"backup_path is not valid: {exception}"
278
                ) from exception
279
280 1
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282 1
            raise HTTPException(400, detail=detail)
283
284 1
        try:
285 1
            evc._validate_has_primary_or_dynamic()
286 1
        except ValueError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289 1
        try:
290 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
291
        except DuplicatedNoTagUNI as exception:
292
            log.debug("create_circuit result %s %s", exception, 409)
293
            raise HTTPException(409, detail=str(exception)) from exception
294
295 1
        try:
296 1
            self._use_uni_tags(evc)
297 1
        except KytosTagError as exception:
298 1
            raise HTTPException(400, detail=str(exception)) from exception
299
300
        # save circuit
301 1
        try:
302 1
            evc.sync()
303
        except ValidationError as exception:
304
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # store circuit in dictionary
307 1
        self.circuits[evc.id] = evc
308
309
        # Schedule the circuit deploy
310 1
        self.sched.add(evc)
311
312
        # Circuit has no schedule, deploy now
313 1
        if not evc.circuit_scheduler:
314 1
            with evc.lock:
315 1
                evc.deploy()
316
317
        # Notify users
318 1
        result = {"circuit_id": evc.id}
319 1
        status = 201
320 1
        log.debug("create_circuit result %s %s", result, status)
321 1
        emit_event(self.controller, name="created",
322
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324
325 1
    @staticmethod
326 1
    def _use_uni_tags(evc):
327 1
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332 1
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334 1
            raise err
335
336 1
    @listen_to('kytos/flow_manager.flow.removed')
337 1
    def on_flow_delete(self, event):
338
        """Capture delete messages to keep track when flows got removed."""
339
        self.handle_flow_delete(event)
340
341 1
    def handle_flow_delete(self, event):
342
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345 1
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348
349 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350 1
    @validate_openapi(spec)
351 1
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353
354
        The EVC attributes (creation_time, active, current_path,
355
        failover_path, _id, archived) can't be updated.
356
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358 1
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362 1
        except KeyError:
363 1
            result = f"circuit_id {circuit_id} not found"
364 1
            log.debug("update result %s %s", result, 404)
365 1
            raise HTTPException(404, detail=result) from KeyError
366
367 1
        try:
368 1
            updated_data = self._evc_dict_with_instances(data)
369 1
            self._check_no_tag_duplication(
370
                circuit_id, updated_data.get("uni_a"),
371
                updated_data.get("uni_z")
372
            )
373 1
            enable, redeploy = evc.update(**updated_data)
374 1
        except (ValueError, KytosTagError, ValidationError) as exception:
375 1
            log.debug("update result %s %s", exception, 400)
376 1
            raise HTTPException(400, detail=str(exception)) from exception
377 1
        except DuplicatedNoTagUNI as exception:
378
            log.debug("update result %s %s", exception, 409)
379
            raise HTTPException(409, detail=str(exception)) from exception
380 1
        except DisabledSwitch as exception:
381 1
            log.debug("update result %s %s", exception, 409)
382 1
            raise HTTPException(
383
                    409,
384
                    detail=f"Path is not valid: {exception}"
385
                ) from exception
386
387 1
        if evc.is_active():
388
            if enable is False:  # disable if active
389
                with evc.lock:
390
                    evc.remove()
391
            elif redeploy is not None:  # redeploy if active
392
                with evc.lock:
393
                    evc.remove()
394
                    evc.deploy()
395
        else:
396 1
            if enable is True:  # enable if inactive
397 1
                with evc.lock:
398 1
                    evc.deploy()
399 1
            elif evc.is_enabled() and redeploy:
400 1
                with evc.lock:
401 1
                    evc.remove()
402 1
                    evc.deploy()
403 1
        result = {evc.id: evc.as_dict()}
404 1
        status = 200
405
406 1
        log.debug("update result %s %s", result, status)
407 1
        emit_event(self.controller, "updated",
408
                   content=map_evc_event_content(evc, **data))
409 1
        return JSONResponse(result, status_code=status)
410
411 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
412 1
    def delete_circuit(self, request: Request) -> JSONResponse:
413
        """Remove a circuit.
414
415
        First, the flows are removed from the switches, and then the EVC is
416
        disabled.
417
        """
418 1
        circuit_id = request.path_params["circuit_id"]
419 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
420 1
        try:
421 1
            evc = self.circuits.pop(circuit_id)
422 1
        except KeyError:
423 1
            result = f"circuit_id {circuit_id} not found"
424 1
            log.debug("delete_circuit result %s %s", result, 404)
425 1
            raise HTTPException(404, detail=result) from KeyError
426
427 1
        log.info("Removing %s", evc)
428 1
        with evc.lock:
429 1
            evc.remove_current_flows()
430 1
            evc.remove_failover_flows(sync=False)
431 1
            evc.deactivate()
432 1
            evc.disable()
433 1
            self.sched.remove(evc)
434 1
            evc.archive()
435 1
            evc.remove_uni_tags()
436 1
            evc.sync()
437 1
        log.info("EVC removed. %s", evc)
438 1
        result = {"response": f"Circuit {circuit_id} removed"}
439 1
        status = 200
440
441 1
        log.debug("delete_circuit result %s %s", result, status)
442 1
        emit_event(self.controller, "deleted",
443
                   content=map_evc_event_content(evc))
444 1
        return JSONResponse(result, status_code=status)
445
446 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
447 1
    def get_metadata(self, request: Request) -> JSONResponse:
448
        """Get metadata from an EVC."""
449 1
        circuit_id = request.path_params["circuit_id"]
450 1
        try:
451 1
            return (
452
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
453
            )
454
        except KeyError as error:
455
            raise HTTPException(
456
                404,
457
                detail=f"circuit_id {circuit_id} not found."
458
            ) from error
459
460 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
461 1
    @validate_openapi(spec)
462 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
463
        """Add metadata to a bulk of EVCs."""
464 1
        data = get_json_or_400(request, self.controller.loop)
465 1
        circuit_ids = data.pop("circuit_ids")
466
467 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
468
469 1
        fail_evcs = []
470 1
        for _id in circuit_ids:
471 1
            try:
472 1
                evc = self.circuits[_id]
473 1
                evc.extend_metadata(data)
474 1
            except KeyError:
475 1
                fail_evcs.append(_id)
476
477 1
        if fail_evcs:
478 1
            raise HTTPException(404, detail=fail_evcs)
479 1
        return JSONResponse("Operation successful", status_code=201)
480
481 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
482 1
    @validate_openapi(spec)
483 1
    def add_metadata(self, request: Request) -> JSONResponse:
484
        """Add metadata to an EVC."""
485 1
        circuit_id = request.path_params["circuit_id"]
486 1
        metadata = get_json_or_400(request, self.controller.loop)
487 1
        if not isinstance(metadata, dict):
488
            raise HTTPException(400, "Invalid metadata value: {metadata}")
489 1
        try:
490 1
            evc = self.circuits[circuit_id]
491 1
        except KeyError as error:
492 1
            raise HTTPException(
493
                404,
494
                detail=f"circuit_id {circuit_id} not found."
495
            ) from error
496
497 1
        evc.extend_metadata(metadata)
498 1
        evc.sync()
499 1
        return JSONResponse("Operation successful", status_code=201)
500
501 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
502 1
    @validate_openapi(spec)
503 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
504
        """Delete metada from a bulk of EVCs"""
505 1
        data = get_json_or_400(request, self.controller.loop)
506 1
        key = request.path_params["key"]
507 1
        circuit_ids = data.pop("circuit_ids")
508 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
509
510 1
        fail_evcs = []
511 1
        for _id in circuit_ids:
512 1
            try:
513 1
                evc = self.circuits[_id]
514 1
                evc.remove_metadata(key)
515 1
            except KeyError:
516 1
                fail_evcs.append(_id)
517
518 1
        if fail_evcs:
519 1
            raise HTTPException(404, detail=fail_evcs)
520 1
        return JSONResponse("Operation successful")
521
522 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
523 1
    def delete_metadata(self, request: Request) -> JSONResponse:
524
        """Delete metadata from an EVC."""
525 1
        circuit_id = request.path_params["circuit_id"]
526 1
        key = request.path_params["key"]
527 1
        try:
528 1
            evc = self.circuits[circuit_id]
529 1
        except KeyError as error:
530 1
            raise HTTPException(
531
                404,
532
                detail=f"circuit_id {circuit_id} not found."
533
            ) from error
534
535 1
        evc.remove_metadata(key)
536 1
        evc.sync()
537 1
        return JSONResponse("Operation successful")
538
539 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
540 1
    def redeploy(self, request: Request) -> JSONResponse:
541
        """Endpoint to force the redeployment of an EVC."""
542 1
        circuit_id = request.path_params["circuit_id"]
543 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
544 1
        try:
545 1
            evc = self.circuits[circuit_id]
546 1
        except KeyError:
547 1
            raise HTTPException(
548
                404,
549
                detail=f"circuit_id {circuit_id} not found"
550
            ) from KeyError
551 1
        if evc.is_enabled():
552 1
            with evc.lock:
553 1
                evc.remove_current_flows()
554 1
                evc.deploy()
555 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
556 1
            status = 202
557
        else:
558 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
559 1
            status = 409
560
561 1
        return JSONResponse(result, status_code=status)
562
563 1
    @rest("/v2/evc/schedule/", methods=["POST"])
564 1
    @validate_openapi(spec)
565 1
    def create_schedule(self, request: Request) -> JSONResponse:
566
        """
567
        Create a new schedule for a given circuit.
568
569
        This service do no check if there are conflicts with another schedule.
570
        Payload example:
571
            {
572
              "circuit_id":"aa:bb:cc",
573
              "schedule": {
574
                "date": "2019-08-07T14:52:10.967Z",
575
                "interval": "string",
576
                "frequency": "1 * * * *",
577
                "action": "create"
578
              }
579
            }
580
        """
581 1
        log.debug("create_schedule /v2/evc/schedule/")
582 1
        data = get_json_or_400(request, self.controller.loop)
583 1
        circuit_id = data["circuit_id"]
584 1
        schedule_data = data["schedule"]
585
586
        # Get EVC from circuits buffer
587 1
        circuits = self._get_circuits_buffer()
588
589
        # get the circuit
590 1
        evc = circuits.get(circuit_id)
591
592
        # get the circuit
593 1
        if not evc:
594 1
            result = f"circuit_id {circuit_id} not found"
595 1
            log.debug("create_schedule result %s %s", result, 404)
596 1
            raise HTTPException(404, detail=result)
597
598
        # new schedule from dict
599 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
600
601
        # If there is no schedule, create the list
602 1
        if not evc.circuit_scheduler:
603 1
            evc.circuit_scheduler = []
604
605
        # Add the new schedule
606 1
        evc.circuit_scheduler.append(new_schedule)
607
608
        # Add schedule job
609 1
        self.sched.add_circuit_job(evc, new_schedule)
610
611
        # save circuit to mongodb
612 1
        evc.sync()
613
614 1
        result = new_schedule.as_dict()
615 1
        status = 201
616
617 1
        log.debug("create_schedule result %s %s", result, status)
618 1
        return JSONResponse(result, status_code=status)
619
620 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
621 1
    @validate_openapi(spec)
622 1
    def update_schedule(self, request: Request) -> JSONResponse:
623
        """Update a schedule.
624
625
        Change all attributes from the given schedule from a EVC circuit.
626
        The schedule ID is preserved as default.
627
        Payload example:
628
            {
629
              "date": "2019-08-07T14:52:10.967Z",
630
              "interval": "string",
631
              "frequency": "1 * * *",
632
              "action": "create"
633
            }
634
        """
635 1
        data = get_json_or_400(request, self.controller.loop)
636 1
        schedule_id = request.path_params["schedule_id"]
637 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
638
639
        # Try to find a circuit schedule
640 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
641
642
        # Can not modify circuits deleted and archived
643 1
        if not found_schedule:
644 1
            result = f"schedule_id {schedule_id} not found"
645 1
            log.debug("update_schedule result %s %s", result, 404)
646 1
            raise HTTPException(404, detail=result)
647
648 1
        new_schedule = CircuitSchedule.from_dict(data)
649 1
        new_schedule.id = found_schedule.id
650
        # Remove the old schedule
651 1
        evc.circuit_scheduler.remove(found_schedule)
652
        # Append the modified schedule
653 1
        evc.circuit_scheduler.append(new_schedule)
654
655
        # Cancel all schedule jobs
656 1
        self.sched.cancel_job(found_schedule.id)
657
        # Add the new circuit schedule
658 1
        self.sched.add_circuit_job(evc, new_schedule)
659
        # Save EVC to mongodb
660 1
        evc.sync()
661
662 1
        result = new_schedule.as_dict()
663 1
        status = 200
664
665 1
        log.debug("update_schedule result %s %s", result, status)
666 1
        return JSONResponse(result, status_code=status)
667
668 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
669 1
    def delete_schedule(self, request: Request) -> JSONResponse:
670
        """Remove a circuit schedule.
671
672
        Remove the Schedule from EVC.
673
        Remove the Schedule from cron job.
674
        Save the EVC to the Storehouse.
675
        """
676 1
        schedule_id = request.path_params["schedule_id"]
677 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
678 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
679
680
        # Can not modify circuits deleted and archived
681 1
        if not found_schedule:
682 1
            result = f"schedule_id {schedule_id} not found"
683 1
            log.debug("delete_schedule result %s %s", result, 404)
684 1
            raise HTTPException(404, detail=result)
685
686
        # Remove the old schedule
687 1
        evc.circuit_scheduler.remove(found_schedule)
688
689
        # Cancel all schedule jobs
690 1
        self.sched.cancel_job(found_schedule.id)
691
        # Save EVC to mongodb
692 1
        evc.sync()
693
694 1
        result = "Schedule removed"
695 1
        status = 200
696
697 1
        log.debug("delete_schedule result %s %s", result, status)
698 1
        return JSONResponse(result, status_code=status)
699
700 1
    def _check_no_tag_duplication(
701
        self,
702
        evc_id: str,
703
        uni_a: Optional[UNI] = None,
704
        uni_z: Optional[UNI] = None
705
    ):
706
        """Check if the given EVC has UNIs with no tag and if these are
707
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
708
        Args:
709
            evc (dict): EVC to be analyzed.
710
        """
711
712
        # No UNIs
713 1
        if not (uni_a or uni_z):
714 1
            return
715
716 1
        if (not (uni_a and not uni_a.user_tag) and
717
                not (uni_z and not uni_z.user_tag)):
718 1
            return
719 1
        for circuit in self.circuits.copy().values():
720 1
            if (not circuit.archived and circuit._id != evc_id):
721 1
                if uni_a and uni_a.user_tag is None:
722 1
                    circuit.check_no_tag_duplicate(uni_a)
723 1
                if uni_z and uni_z.user_tag is None:
724 1
                    circuit.check_no_tag_duplicate(uni_z)
725
726 1
    @listen_to("kytos/topology.link_up")
727 1
    def on_link_up(self, event):
728
        """Change circuit when link is up or end_maintenance."""
729
        self.handle_link_up(event)
730
731 1
    def handle_link_up(self, event):
732
        """Change circuit when link is up or end_maintenance."""
733 1
        log.info("Event handle_link_up %s", event.content["link"])
734 1
        for evc in self.get_evcs_by_svc_level():
735 1
            if evc.is_enabled() and not evc.archived:
736 1
                with evc.lock:
737 1
                    evc.handle_link_up(event.content["link"])
738
739
    # Possibly replace this with interruptions?
740 1
    @listen_to(
741
        '.*.switch.interface.(link_up|link_down|created|deleted)'
742
    )
743 1
    def on_interface_link_change(self, event: KytosEvent):
744
        """
745
        Handler for interface link_up and link_down events.
746
        """
747
        self.handle_on_interface_link_change(event)
748
749 1
    def handle_on_interface_link_change(self, event: KytosEvent):
750
        """
751
        Handler to sort interface events {link_(up, down), create, deleted}
752
753
        To avoid multiple database updated (link flap):
754
        Every interface is identfied and processed in parallel.
755
        Once an interface event is received a time is started.
756
        While time is running self._intf_events will be updated.
757
        After time has passed last received event will be processed.
758
        """
759 1
        iface = event.content.get("interface")
760 1
        with self._lock_interfaces[iface.id]:
761 1
            _now = event.timestamp
762
            # Return out of order events
763 1
            if (
764
                iface.id in self._intf_events
765
                and self._intf_events[iface.id]["event"].timestamp > _now
766
            ):
767 1
                return
768 1
            self._intf_events[iface.id].update({"event": event})
769 1
            if "last_acquired" in self._intf_events[iface.id]:
770 1
                return
771 1
            self._intf_events[iface.id].update({"last_acquired": now()})
772 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
773 1
        with self._lock_interfaces[iface.id]:
774 1
            event = self._intf_events[iface.id]["event"]
775 1
            self._intf_events[iface.id].pop('last_acquired', None)
776 1
            _, _, event_type = event.name.rpartition('.')
777 1
            if event_type in ('link_up', 'created'):
778 1
                self.handle_interface_link_up(iface)
779 1
            elif event_type in ('link_down', 'deleted'):
780 1
                self.handle_interface_link_down(iface)
781
782 1
    def handle_interface_link_up(self, interface):
783
        """
784
        Handler for interface link_up events
785
        """
786
        for evc in self.get_evcs_by_svc_level():
787
            if evc.archived:
788
                continue
789
            log.info("Event handle_interface_link_up %s", interface)
790
            evc.handle_interface_link_up(
791
                interface
792
            )
793
794 1
    def handle_interface_link_down(self, interface):
795
        """
796
        Handler for interface link_down events
797
        """
798
        for evc in self.get_evcs_by_svc_level():
799
            if evc.archived:
800
                continue
801
            log.info("Event handle_interface_link_down %s", interface)
802
            evc.handle_interface_link_down(
803
                interface
804
            )
805
806 1
    @listen_to("kytos/topology.link_down")
807 1
    def on_link_down(self, event):
808
        """Change circuit when link is down or under_mantenance."""
809
        self.handle_link_down(event)
810
811 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
812
        """Change circuit when link is down or under_mantenance."""
813 1
        link = event.content["link"]
814 1
        log.info("Event handle_link_down %s", link)
815 1
        switch_flows = {}
816 1
        evcs_with_failover = []
817 1
        evcs_normal = []
818 1
        check_failover = []
819 1
        for evc in self.get_evcs_by_svc_level():
820 1
            if evc.is_affected_by_link(link):
821
                # if there is no failover path, handles link down the
822
                # tradditional way
823 1
                if (
824
                    not getattr(evc, 'failover_path', None) or
825
                    evc.is_failover_path_affected_by_link(link)
826
                ):
827 1
                    evcs_normal.append(evc)
828 1
                    continue
829 1
                try:
830 1
                    dpid_flows = evc.get_failover_flows()
831
                # pylint: disable=broad-except
832 1
                except Exception:
833 1
                    err = traceback.format_exc().replace("\n", ", ")
834 1
                    log.error(
835
                        f"Ignore Failover path for {evc} due to error: {err}"
836
                    )
837 1
                    evcs_normal.append(evc)
838 1
                    continue
839 1
                for dpid, flows in dpid_flows.items():
840 1
                    switch_flows.setdefault(dpid, [])
841 1
                    switch_flows[dpid].extend(flows)
842 1
                evcs_with_failover.append(evc)
843
            else:
844 1
                check_failover.append(evc)
845
846 1
        while switch_flows:
847 1
            offset = settings.BATCH_SIZE or None
848 1
            switches = list(switch_flows.keys())
849 1
            for dpid in switches:
850 1
                emit_event(
851
                    self.controller,
852
                    context="kytos.flow_manager",
853
                    name="flows.install",
854
                    content={
855
                        "dpid": dpid,
856
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
857
                    }
858
                )
859 1
                if offset is None or offset >= len(switch_flows[dpid]):
860 1
                    del switch_flows[dpid]
861 1
                    continue
862 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
863 1
            time.sleep(settings.BATCH_INTERVAL)
864
865 1
        for evc in evcs_with_failover:
866 1
            with evc.lock:
867 1
                old_path = evc.current_path
868 1
                evc.current_path = evc.failover_path
869 1
                evc.failover_path = old_path
870 1
                evc.sync()
871 1
            emit_event(self.controller, "redeployed_link_down",
872
                       content=map_evc_event_content(evc))
873 1
            log.info(
874
                f"{evc} redeployed with failover due to link down {link.id}"
875
            )
876
877 1
        for evc in evcs_normal:
878 1
            emit_event(
879
                self.controller,
880
                "evc_affected_by_link_down",
881
                content={"link_id": link.id} | map_evc_event_content(evc)
882
            )
883
884
        # After handling the hot path, check if new failover paths are needed.
885
        # Note that EVCs affected by link down will generate a KytosEvent for
886
        # deployed|redeployed, which will trigger the failover path setup.
887
        # Thus, we just need to further check the check_failover list
888 1
        for evc in check_failover:
889 1
            if evc.is_failover_path_affected_by_link(link):
890 1
                with evc.lock:
891 1
                    evc.setup_failover_path()
892
893 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
894 1
    def on_evc_affected_by_link_down(self, event):
895
        """Change circuit when link is down or under_mantenance."""
896
        self.handle_evc_affected_by_link_down(event)
897
898 1
    def handle_evc_affected_by_link_down(self, event):
899
        """Change circuit when link is down or under_mantenance."""
900 1
        evc = self.circuits.get(event.content["evc_id"])
901 1
        link_id = event.content['link_id']
902 1
        if not evc:
903 1
            return
904 1
        with evc.lock:
905 1
            result = evc.handle_link_down()
906 1
        event_name = "error_redeploy_link_down"
907 1
        if result:
908 1
            log.info(f"{evc} redeployed due to link down {link_id}")
909 1
            event_name = "redeployed_link_down"
910 1
        emit_event(self.controller, event_name,
911
                   content=map_evc_event_content(evc))
912
913 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
914 1
    def on_evc_deployed(self, event):
915
        """Handle EVC deployed|redeployed_link_down."""
916
        self.handle_evc_deployed(event)
917
918 1
    def handle_evc_deployed(self, event):
919
        """Setup failover path on evc deployed."""
920 1
        evc = self.circuits.get(event.content["evc_id"])
921 1
        if not evc:
922 1
            return
923 1
        with evc.lock:
924 1
            evc.setup_failover_path()
925
926 1
    @listen_to("kytos/topology.topology_loaded")
927 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
928
        """Load EVCs once the topology is available."""
929
        self.load_all_evcs()
930
931 1
    def load_all_evcs(self):
932
        """Try to load all EVCs on startup."""
933 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
934 1
        for circuit_id, circuit in circuits:
935 1
            if circuit_id not in self.circuits:
936 1
                self._load_evc(circuit)
937 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
938
                   timeout=1)
939
940 1
    def _load_evc(self, circuit_dict):
941
        """Load one EVC from mongodb to memory."""
942 1
        try:
943 1
            evc = self._evc_from_dict(circuit_dict)
944 1
        except (ValueError, KytosTagError) as exception:
945 1
            log.error(
946
                f"Could not load EVC: dict={circuit_dict} error={exception}"
947
            )
948 1
            return None
949 1
        if evc.archived:
950 1
            return None
951
952 1
        self.circuits.setdefault(evc.id, evc)
953 1
        self.sched.add(evc)
954 1
        return evc
955
956 1
    @listen_to("kytos/flow_manager.flow.error")
957 1
    def on_flow_mod_error(self, event):
958
        """Handle flow mod errors related to an EVC."""
959
        self.handle_flow_mod_error(event)
960
961 1
    def handle_flow_mod_error(self, event):
962
        """Handle flow mod errors related to an EVC."""
963 1
        flow = event.content["flow"]
964 1
        command = event.content.get("error_command")
965 1
        if command != "add":
966
            return
967 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
968 1
        if evc:
969 1
            with evc.lock:
970 1
                evc.remove_current_flows()
971
972 1
    def _evc_dict_with_instances(self, evc_dict):
973
        """Convert some dict values to instance of EVC classes.
974
975
        This method will convert: [UNI, Link]
976
        """
977 1
        data = evc_dict.copy()  # Do not modify the original dict
978 1
        for attribute, value in data.items():
979
            # Get multiple attributes.
980
            # Ex: uni_a, uni_z
981 1
            if "uni" in attribute:
982 1
                try:
983 1
                    data[attribute] = self._uni_from_dict(value)
984 1
                except ValueError as exception:
985 1
                    result = "Error creating UNI: Invalid value"
986 1
                    raise ValueError(result) from exception
987
988 1
            if attribute == "circuit_scheduler":
989 1
                data[attribute] = []
990 1
                for schedule in value:
991 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
992
993
            # Get multiple attributes.
994
            # Ex: primary_links,
995
            #     backup_links,
996
            #     current_links_cache,
997
            #     primary_links_cache,
998
            #     backup_links_cache
999 1
            if "links" in attribute:
1000 1
                data[attribute] = [
1001
                    self._link_from_dict(link) for link in value
1002
                ]
1003
1004
            # Ex: current_path,
1005
            #     primary_path,
1006
            #     backup_path
1007 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1008 1
                data[attribute] = Path(
1009
                    [self._link_from_dict(link) for link in value]
1010
                )
1011
1012 1
        return data
1013
1014 1
    def _evc_from_dict(self, evc_dict):
1015 1
        data = self._evc_dict_with_instances(evc_dict)
1016 1
        data["table_group"] = self.table_group
1017 1
        return EVC(self.controller, **data)
1018
1019 1
    def _uni_from_dict(self, uni_dict):
1020
        """Return a UNI object from python dict."""
1021 1
        if uni_dict is None:
1022 1
            return False
1023
1024 1
        interface_id = uni_dict.get("interface_id")
1025 1
        interface = self.controller.get_interface_by_id(interface_id)
1026 1
        if interface is None:
1027 1
            result = (
1028
                "Error creating UNI:"
1029
                + f"Could not instantiate interface {interface_id}"
1030
            )
1031 1
            raise ValueError(result) from ValueError
1032 1
        tag_convert = {1: "vlan"}
1033 1
        tag_dict = uni_dict.get("tag", None)
1034 1
        if tag_dict:
1035 1
            tag_type = tag_dict.get("tag_type")
1036 1
            tag_type = tag_convert.get(tag_type, tag_type)
1037 1
            tag_value = tag_dict.get("value")
1038 1
            if isinstance(tag_value, list):
1039 1
                tag_value = get_tag_ranges(tag_value)
1040 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1041 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1042
            else:
1043 1
                tag = TAG(tag_type, tag_value)
1044
        else:
1045 1
            tag = None
1046 1
        uni = UNI(interface, tag)
1047 1
        return uni
1048
1049 1
    def _link_from_dict(self, link_dict):
1050
        """Return a Link object from python dict."""
1051 1
        id_a = link_dict.get("endpoint_a").get("id")
1052 1
        id_b = link_dict.get("endpoint_b").get("id")
1053
1054 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1055 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1056 1
        if not endpoint_a:
1057 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1058 1
            raise ValueError(error_msg)
1059 1
        if not endpoint_b:
1060
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1061
            raise ValueError(error_msg)
1062
1063 1
        link = Link(endpoint_a, endpoint_b)
1064 1
        if "metadata" in link_dict:
1065 1
            link.extend_metadata(link_dict.get("metadata"))
1066
1067 1
        s_vlan = link.get_metadata("s_vlan")
1068 1
        if s_vlan:
1069 1
            tag = TAG.from_dict(s_vlan)
1070 1
            if tag is False:
1071
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1072
                raise ValueError(error_msg)
1073 1
            link.update_metadata("s_vlan", tag)
1074 1
        return link
1075
1076 1
    def _find_evc_by_schedule_id(self, schedule_id):
1077
        """
1078
        Find an EVC and CircuitSchedule based on schedule_id.
1079
1080
        :param schedule_id: Schedule ID
1081
        :return: EVC and Schedule
1082
        """
1083 1
        circuits = self._get_circuits_buffer()
1084 1
        found_schedule = None
1085 1
        evc = None
1086
1087
        # pylint: disable=unused-variable
1088 1
        for c_id, circuit in circuits.items():
1089 1
            for schedule in circuit.circuit_scheduler:
1090 1
                if schedule.id == schedule_id:
1091 1
                    found_schedule = schedule
1092 1
                    evc = circuit
1093 1
                    break
1094 1
            if found_schedule:
1095 1
                break
1096 1
        return evc, found_schedule
1097
1098 1
    def _get_circuits_buffer(self):
1099
        """
1100
        Return the circuit buffer.
1101
1102
        If the buffer is empty, try to load data from mongodb.
1103
        """
1104 1
        if not self.circuits:
1105
            # Load circuits from mongodb to buffer
1106 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1107 1
            for c_id, circuit in circuits.items():
1108 1
                evc = self._evc_from_dict(circuit)
1109 1
                self.circuits[c_id] = evc
1110 1
        return self.circuits
1111
1112
    # pylint: disable=attribute-defined-outside-init
1113 1
    @alisten_to("kytos/of_multi_table.enable_table")
1114 1
    async def on_table_enabled(self, event):
1115
        """Handle a recently table enabled."""
1116 1
        table_group = event.content.get("mef_eline", None)
1117 1
        if not table_group:
1118 1
            return
1119 1
        for group in table_group:
1120 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1121 1
                log.error(f'The table group "{group}" is not allowed for '
1122
                          f'mef_eline. Allowed table groups are '
1123
                          f'{settings.TABLE_GROUP_ALLOWED}')
1124 1
                return
1125 1
        self.table_group.update(table_group)
1126 1
        content = {"group_table": self.table_group}
1127 1
        name = "kytos/mef_eline.enable_table"
1128
        await aemit_event(self.controller, name, content)
1129