Passed
Pull Request — master (#321)
by
unknown
03:19
created

build.main.Main.on_table_enabled()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4.0072

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
ccs 12
cts 13
cp 0.9231
crap 4.0072
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from flask import jsonify, request
11 1
from pydantic import ValidationError
12 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
13
                                 MethodNotAllowed, NotFound,
14
                                 UnsupportedMediaType)
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
18
                                validate_openapi)
19 1
from kytos.core.interface import TAG, UNI
20 1
from kytos.core.link import Link
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import InvalidPath
23 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26 1
from napps.kytos.mef_eline.utils import (aemit_event, emit_event,
27
                                         map_evc_event_content)
28
29
30
# pylint: disable=too-many-public-methods
31 1
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36
37 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38
39 1
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47
        # object used to scheduler circuit events
48 1
        self.sched = Scheduler()
49
50
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52 1
        self.mongo_controller.bootstrap_indexes()
53
54
        # set the controller that will manager the dynamic paths
55 1
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58
        # Every create/update/delete must be synced to mongodb.
59 1
        self.circuits = {}
60
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64
65 1
        self.load_all_evcs()
66
67 1
    def get_evcs_by_svc_level(self) -> list:
68
        """Get circuits sorted by desc service level and asc creation_time.
69
70
        In the future, as more ops are offloaded it should be get from the DB.
71
        """
72 1
        return sorted(self.circuits.values(),
73
                      key=lambda x: (-x.service_level, x.creation_time))
74
75 1
    @staticmethod
76 1
    def get_eline_controller():
77
        """Return the ELineController instance."""
78
        return controllers.ELineController()
79
80 1
    def execute(self):
81
        """Execute once when the napp is running."""
82 1
        if self._lock.locked():
83 1
            return
84 1
        log.debug("Starting consistency routine")
85 1
        with self._lock:
86 1
            self.execute_consistency()
87 1
        log.debug("Finished consistency routine")
88
89 1
    @staticmethod
90 1
    def should_be_checked(circuit):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92
        # pylint: disable=too-many-boolean-expressions
93 1
        if (
94
                circuit.is_enabled()
95
                and not circuit.is_active()
96
                and not circuit.lock.locked()
97
                and not circuit.has_recent_removed_flow()
98
                and not circuit.is_recent_updated()
99
                # if a inter-switch EVC does not have current_path, it does not
100
                # make sense to run sdntrace on it
101
                and (circuit.is_intra_switch() or circuit.current_path)
102
                ):
103 1
            return True
104
        return False
105
106 1
    def execute_consistency(self):
107
        """Execute consistency routine."""
108 1
        circuits_to_check = []
109 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
110 1
        for circuit in self.get_evcs_by_svc_level():
111 1
            stored_circuits.pop(circuit.id, None)
112 1
            if self.should_be_checked(circuit):
113 1
                circuits_to_check.append(circuit)
114 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
115 1
        for circuit in circuits_to_check:
116 1
            is_checked = circuits_checked.get(circuit.id)
117 1
            if is_checked:
118 1
                circuit.execution_rounds = 0
119 1
                log.info(f"{circuit} enabled but inactive - activating")
120 1
                with circuit.lock:
121 1
                    circuit.activate()
122 1
                    circuit.sync()
123
            else:
124 1
                circuit.execution_rounds += 1
125 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
126 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
127 1
                    with circuit.lock:
128 1
                        circuit.deploy()
129 1
        for circuit_id in stored_circuits:
130 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
131 1
            self._load_evc(stored_circuits[circuit_id])
132
133 1
    def shutdown(self):
134
        """Execute when your napp is unloaded.
135
136
        If you have some cleanup procedure, insert it here.
137
        """
138
139 1
    @rest("/v2/evc/", methods=["GET"])
140 1
    def list_circuits(self):
141
        """Endpoint to return circuits stored.
142
143
        archive query arg if defined (not null) will be filtered
144
        accordingly, by default only non archived evcs will be listed
145
        """
146 1
        log.debug("list_circuits /v2/evc")
147 1
        args = request.args.to_dict()
148 1
        archived = args.pop("archived", "false").lower()
149 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
150
                                                      metadata=args)
151 1
        circuits = circuits['circuits']
152 1
        return jsonify(circuits), 200
153
154 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
155 1
    def get_circuit(self, circuit_id):
156
        """Endpoint to return a circuit based on id."""
157 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
158 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
159 1
        if not circuit:
160 1
            result = f"circuit_id {circuit_id} not found"
161 1
            log.debug("get_circuit result %s %s", result, 404)
162 1
            raise NotFound(result)
163 1
        status = 200
164 1
        log.debug("get_circuit result %s %s", circuit, status)
165 1
        return jsonify(circuit), status
166
167 1
    @rest("/v2/evc/", methods=["POST"])
168 1
    @validate_openapi(spec)
169 1
    def create_circuit(self, data):
170
        """Try to create a new circuit.
171
172
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
173
        UNI_Z's requested C-VID are available from the interfaces' pools. This
174
        is checked when creating the UNI object.
175
176
        Then, E-Line NApp requests a primary and a backup path to the
177
        Pathfinder NApp using the attributes primary_links and backup_links
178
        submitted via REST
179
180
        # For each link composing paths in #3:
181
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
182
        #  - Using the S-VID obtained, generate abstract flow entries to be
183
        #    sent to FlowManager
184
185
        Push abstract flow entries to FlowManager and FlowManager pushes
186
        OpenFlow entries to datapaths
187
188
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
189
        creation
190
191
        Finnaly, notify user of the status of its request.
192
        """
193
        # Try to create the circuit object
194 1
        log.debug("create_circuit /v2/evc/")
195
196 1
        try:
197 1
            evc = self._evc_from_dict(data)
198 1
        except ValueError as exception:
199 1
            log.debug("create_circuit result %s %s", exception, 400)
200 1
            raise BadRequest(str(exception)) from BadRequest
201
202 1
        if evc.primary_path:
203
            try:
204
                evc.primary_path.is_valid(
205
                    evc.uni_a.interface.switch,
206
                    evc.uni_z.interface.switch,
207
                    bool(evc.circuit_scheduler),
208
                )
209
            except InvalidPath as exception:
210
                raise BadRequest(
211
                    f"primary_path is not valid: {exception}"
212
                ) from exception
213 1
        if evc.backup_path:
214
            try:
215
                evc.backup_path.is_valid(
216
                    evc.uni_a.interface.switch,
217
                    evc.uni_z.interface.switch,
218
                    bool(evc.circuit_scheduler),
219
                )
220
            except InvalidPath as exception:
221
                raise BadRequest(
222
                    f"backup_path is not valid: {exception}"
223
                ) from exception
224
225
        # verify duplicated evc
226 1
        if self._is_duplicated_evc(evc):
227 1
            result = "The EVC already exists."
228 1
            log.debug("create_circuit result %s %s", result, 409)
229 1
            raise Conflict(result)
230
231 1
        try:
232 1
            evc._validate_has_primary_or_dynamic()
233 1
        except ValueError as exception:
234 1
            raise BadRequest(str(exception)) from exception
235
236
        # save circuit
237 1
        try:
238 1
            evc.sync()
239
        except ValidationError as exception:
240
            raise BadRequest(str(exception)) from exception
241
242
        # store circuit in dictionary
243 1
        self.circuits[evc.id] = evc
244
245
        # Schedule the circuit deploy
246 1
        self.sched.add(evc)
247
248
        # Circuit has no schedule, deploy now
249 1
        if not evc.circuit_scheduler:
250 1
            with evc.lock:
251 1
                evc.deploy()
252
253
        # Notify users
254 1
        result = {"circuit_id": evc.id}
255 1
        status = 201
256 1
        log.debug("create_circuit result %s %s", result, status)
257 1
        emit_event(self.controller, name="created",
258
                   content=map_evc_event_content(evc))
259 1
        return jsonify(result), status
260
261 1
    @listen_to('kytos/flow_manager.flow.removed')
262 1
    def on_flow_delete(self, event):
263
        """Capture delete messages to keep track when flows got removed."""
264
        self.handle_flow_delete(event)
265
266 1
    def handle_flow_delete(self, event):
267
        """Keep track when the EVC got flows removed by deriving its cookie."""
268 1
        flow = event.content["flow"]
269 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
270 1
        if evc:
271 1
            log.debug("Flow removed in EVC %s", evc.id)
272 1
            evc.set_flow_removed_at()
273
274 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
275 1
    @validate_openapi(spec)
276 1
    def update(self, data, circuit_id):
277
        """Update a circuit based on payload.
278
279
        The EVC attributes (creation_time, active, current_path,
280
        failover_path, _id, archived) can't be updated.
281
        """
282 1
        log.debug("update /v2/evc/%s", circuit_id)
283 1
        try:
284 1
            evc = self.circuits[circuit_id]
285 1
        except KeyError:
286 1
            result = f"circuit_id {circuit_id} not found"
287 1
            log.debug("update result %s %s", result, 404)
288 1
            raise NotFound(result) from NotFound
289
290 1
        if evc.archived:
291 1
            result = "Can't update archived EVC"
292 1
            log.debug("update result %s %s", result, 405)
293 1
            raise MethodNotAllowed(["GET"], result)
294
295 1
        try:
296 1
            enable, redeploy = evc.update(
297
                **self._evc_dict_with_instances(data)
298
            )
299 1
        except ValidationError as exception:
300
            raise BadRequest(str(exception)) from exception
301 1
        except ValueError as exception:
302 1
            log.error(exception)
303 1
            log.debug("update result %s %s", exception, 400)
304 1
            raise BadRequest(str(exception)) from BadRequest
305
306 1
        if evc.is_active():
307
            if enable is False:  # disable if active
308
                with evc.lock:
309
                    evc.remove()
310
            elif redeploy is not None:  # redeploy if active
311
                with evc.lock:
312
                    evc.remove()
313
                    evc.deploy()
314
        else:
315 1
            if enable is True:  # enable if inactive
316 1
                with evc.lock:
317 1
                    evc.deploy()
318 1
        result = {evc.id: evc.as_dict()}
319 1
        status = 200
320
321 1
        log.debug("update result %s %s", result, status)
322 1
        emit_event(self.controller, "updated",
323
                   content=map_evc_event_content(evc, **data))
324 1
        return jsonify(result), status
325
326 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
327 1
    def delete_circuit(self, circuit_id):
328
        """Remove a circuit.
329
330
        First, the flows are removed from the switches, and then the EVC is
331
        disabled.
332
        """
333 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
334 1
        try:
335 1
            evc = self.circuits[circuit_id]
336 1
        except KeyError:
337 1
            result = f"circuit_id {circuit_id} not found"
338 1
            log.debug("delete_circuit result %s %s", result, 404)
339 1
            raise NotFound(result) from NotFound
340
341 1
        if evc.archived:
342 1
            result = f"Circuit {circuit_id} already removed"
343 1
            log.debug("delete_circuit result %s %s", result, 404)
344 1
            raise NotFound(result) from NotFound
345
346 1
        log.info("Removing %s", evc)
347 1
        with evc.lock:
348 1
            evc.remove_current_flows()
349 1
            evc.remove_failover_flows(sync=False)
350 1
            evc.deactivate()
351 1
            evc.disable()
352 1
            self.sched.remove(evc)
353 1
            evc.archive()
354 1
            evc.sync()
355 1
        log.info("EVC removed. %s", evc)
356 1
        result = {"response": f"Circuit {circuit_id} removed"}
357 1
        status = 200
358
359 1
        log.debug("delete_circuit result %s %s", result, status)
360 1
        emit_event(self.controller, "deleted",
361
                   content=map_evc_event_content(evc))
362 1
        return jsonify(result), status
363
364 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
365 1
    def get_metadata(self, circuit_id):
366
        """Get metadata from an EVC."""
367 1
        try:
368 1
            return (
369
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
370
                200,
371
            )
372
        except KeyError as error:
373
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
374
375 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
376 1
    @validate_openapi(spec)
377 1
    def bulk_add_metadata(self, data):
378
        """Add metadata to a bulk of EVCs."""
379 1
        circuit_ids = data.pop("circuit_ids")
380 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
381
382 1
        fail_evcs = []
383 1
        for _id in circuit_ids:
384 1
            try:
385 1
                evc = self.circuits[_id]
386 1
                evc.extend_metadata(data)
387 1
            except KeyError:
388 1
                fail_evcs.append(_id)
389
390 1
        if fail_evcs:
391 1
            return jsonify(fail_evcs), 404
392 1
        return jsonify("Operation successful"), 201
393
394 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
395 1
    def add_metadata(self, circuit_id):
396
        """Add metadata to an EVC."""
397 1
        try:
398 1
            metadata = request.get_json()
399 1
            content_type = request.content_type
400 1
        except BadRequest as error:
401 1
            result = "The request body is not a well-formed JSON."
402 1
            raise BadRequest(result) from error
403 1
        if content_type is None:
404 1
            result = "The request body is empty."
405 1
            raise BadRequest(result)
406 1
        if metadata is None:
407 1
            if content_type != "application/json":
408 1
                result = (
409
                    "The content type must be application/json "
410
                    f"(received {content_type})."
411
                )
412
            else:
413
                result = "Metadata is empty."
414 1
            raise UnsupportedMediaType(result)
415
416 1
        try:
417 1
            evc = self.circuits[circuit_id]
418 1
        except KeyError as error:
419 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
420
421 1
        evc.extend_metadata(metadata)
422 1
        evc.sync()
423 1
        return jsonify("Operation successful"), 201
424
425 1 View Code Duplication
    @rest("v2/evc/metadata/<key>", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
426 1
    @validate_openapi(spec)
427 1
    def bulk_delete_metadata(self, data, key):
428
        """Delete metada from a bulk of EVCs"""
429 1
        circuit_ids = data.pop("circuit_ids")
430 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
431
432 1
        fail_evcs = []
433 1
        for _id in circuit_ids:
434 1
            try:
435 1
                evc = self.circuits[_id]
436 1
                evc.remove_metadata(key)
437
            except KeyError:
438
                fail_evcs.append(_id)
439
440 1
        if fail_evcs:
441
            return jsonify(fail_evcs), 404
442 1
        return jsonify("Operation successful"), 200
443
444 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
445 1
    def delete_metadata(self, circuit_id, key):
446
        """Delete metadata from an EVC."""
447 1
        try:
448 1
            evc = self.circuits[circuit_id]
449 1
        except KeyError as error:
450 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
451
452 1
        evc.remove_metadata(key)
453 1
        evc.sync()
454 1
        return jsonify("Operation successful"), 200
455
456 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
457 1
    def redeploy(self, circuit_id):
458
        """Endpoint to force the redeployment of an EVC."""
459 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
460 1
        try:
461 1
            evc = self.circuits[circuit_id]
462 1
        except KeyError:
463 1
            result = f"circuit_id {circuit_id} not found"
464 1
            raise NotFound(result) from NotFound
465 1
        if evc.is_enabled():
466 1
            with evc.lock:
467 1
                evc.remove_current_flows()
468 1
                evc.deploy()
469 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
470 1
            status = 202
471
        else:
472 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
473 1
            status = 409
474
475 1
        return jsonify(result), status
476
477 1
    @rest("/v2/evc/schedule", methods=["GET"])
478 1
    def list_schedules(self):
479
        """Endpoint to return all schedules stored for all circuits.
480
481
        Return a JSON with the following template:
482
        [{"schedule_id": <schedule_id>,
483
         "circuit_id": <circuit_id>,
484
         "schedule": <schedule object>}]
485
        """
486 1
        log.debug("list_schedules /v2/evc/schedule")
487 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
488 1
        if not circuits:
489 1
            result = {}
490 1
            status = 200
491 1
            return jsonify(result), status
492
493 1
        result = []
494 1
        status = 200
495 1
        for circuit in circuits:
496 1
            circuit_scheduler = circuit.get("circuit_scheduler")
497 1
            if circuit_scheduler:
498 1
                for scheduler in circuit_scheduler:
499 1
                    value = {
500
                        "schedule_id": scheduler.get("id"),
501
                        "circuit_id": circuit.get("id"),
502
                        "schedule": scheduler,
503
                    }
504 1
                    result.append(value)
505
506 1
        log.debug("list_schedules result %s %s", result, status)
507 1
        return jsonify(result), status
508
509 1
    @rest("/v2/evc/schedule/", methods=["POST"])
510 1
    @validate_openapi(spec)
511 1
    def create_schedule(self, data):
512
        """
513
        Create a new schedule for a given circuit.
514
515
        This service do no check if there are conflicts with another schedule.
516
        Payload example:
517
            {
518
              "circuit_id":"aa:bb:cc",
519
              "schedule": {
520
                "date": "2019-08-07T14:52:10.967Z",
521
                "interval": "string",
522
                "frequency": "1 * * * *",
523
                "action": "create"
524
              }
525
            }
526
        """
527 1
        log.debug("create_schedule /v2/evc/schedule/")
528 1
        circuit_id = data["circuit_id"]
529 1
        schedule_data = data["schedule"]
530
531
        # Get EVC from circuits buffer
532 1
        circuits = self._get_circuits_buffer()
533
534
        # get the circuit
535 1
        evc = circuits.get(circuit_id)
536
537
        # get the circuit
538 1
        if not evc:
539 1
            result = f"circuit_id {circuit_id} not found"
540 1
            log.debug("create_schedule result %s %s", result, 404)
541 1
            raise NotFound(result) from NotFound
542
        # Can not modify circuits deleted and archived
543 1
        if evc.archived:
544 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
545 1
            log.debug("create_schedule result %s %s", result, 403)
546 1
            raise Forbidden(result) from Forbidden
547
548
        # new schedule from dict
549 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
550
551
        # If there is no schedule, create the list
552 1
        if not evc.circuit_scheduler:
553 1
            evc.circuit_scheduler = []
554
555
        # Add the new schedule
556 1
        evc.circuit_scheduler.append(new_schedule)
557
558
        # Add schedule job
559 1
        self.sched.add_circuit_job(evc, new_schedule)
560
561
        # save circuit to mongodb
562 1
        evc.sync()
563
564 1
        result = new_schedule.as_dict()
565 1
        status = 201
566
567 1
        log.debug("create_schedule result %s %s", result, status)
568 1
        return jsonify(result), status
569
570 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
571 1
    @validate_openapi(spec)
572 1
    def update_schedule(self, data, schedule_id):
573
        """Update a schedule.
574
575
        Change all attributes from the given schedule from a EVC circuit.
576
        The schedule ID is preserved as default.
577
        Payload example:
578
            {
579
              "date": "2019-08-07T14:52:10.967Z",
580
              "interval": "string",
581
              "frequency": "1 * * *",
582
              "action": "create"
583
            }
584
        """
585 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
586
587
        # Try to find a circuit schedule
588 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
589
590
        # Can not modify circuits deleted and archived
591 1
        if not found_schedule:
592 1
            result = f"schedule_id {schedule_id} not found"
593 1
            log.debug("update_schedule result %s %s", result, 404)
594 1
            raise NotFound(result) from NotFound
595 1
        if evc.archived:
596 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
597 1
            log.debug("update_schedule result %s %s", result, 403)
598 1
            raise Forbidden(result) from Forbidden
599
600 1
        new_schedule = CircuitSchedule.from_dict(data)
601 1
        new_schedule.id = found_schedule.id
602
        # Remove the old schedule
603 1
        evc.circuit_scheduler.remove(found_schedule)
604
        # Append the modified schedule
605 1
        evc.circuit_scheduler.append(new_schedule)
606
607
        # Cancel all schedule jobs
608 1
        self.sched.cancel_job(found_schedule.id)
609
        # Add the new circuit schedule
610 1
        self.sched.add_circuit_job(evc, new_schedule)
611
        # Save EVC to mongodb
612 1
        evc.sync()
613
614 1
        result = new_schedule.as_dict()
615 1
        status = 200
616
617 1
        log.debug("update_schedule result %s %s", result, status)
618 1
        return jsonify(result), status
619
620 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
621 1
    def delete_schedule(self, schedule_id):
622
        """Remove a circuit schedule.
623
624
        Remove the Schedule from EVC.
625
        Remove the Schedule from cron job.
626
        Save the EVC to the Storehouse.
627
        """
628 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
629 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
630
631
        # Can not modify circuits deleted and archived
632 1
        if not found_schedule:
633 1
            result = f"schedule_id {schedule_id} not found"
634 1
            log.debug("delete_schedule result %s %s", result, 404)
635 1
            raise NotFound(result)
636
637 1
        if evc.archived:
638 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
639 1
            log.debug("delete_schedule result %s %s", result, 403)
640 1
            raise Forbidden(result)
641
642
        # Remove the old schedule
643 1
        evc.circuit_scheduler.remove(found_schedule)
644
645
        # Cancel all schedule jobs
646 1
        self.sched.cancel_job(found_schedule.id)
647
        # Save EVC to mongodb
648 1
        evc.sync()
649
650 1
        result = "Schedule removed"
651 1
        status = 200
652
653 1
        log.debug("delete_schedule result %s %s", result, status)
654 1
        return jsonify(result), status
655
656 1
    def _is_duplicated_evc(self, evc):
657
        """Verify if the circuit given is duplicated with the stored evcs.
658
659
        Args:
660
            evc (EVC): circuit to be analysed.
661
662
        Returns:
663
            boolean: True if the circuit is duplicated, otherwise False.
664
665
        """
666 1
        for circuit in tuple(self.circuits.values()):
667 1
            if not circuit.archived and circuit.shares_uni(evc):
668 1
                return True
669 1
        return False
670
671 1
    @listen_to("kytos/topology.link_up")
672 1
    def on_link_up(self, event):
673
        """Change circuit when link is up or end_maintenance."""
674
        self.handle_link_up(event)
675
676 1
    def handle_link_up(self, event):
677
        """Change circuit when link is up or end_maintenance."""
678 1
        log.info("Event handle_link_up %s", event.content["link"])
679 1
        for evc in self.get_evcs_by_svc_level():
680 1
            if evc.is_enabled() and not evc.archived:
681 1
                with evc.lock:
682 1
                    evc.handle_link_up(event.content["link"])
683
684 1
    @listen_to("kytos/topology.link_down")
685 1
    def on_link_down(self, event):
686
        """Change circuit when link is down or under_mantenance."""
687
        self.handle_link_down(event)
688
689 1
    def handle_link_down(self, event):
690
        """Change circuit when link is down or under_mantenance."""
691 1
        link = event.content["link"]
692 1
        log.info("Event handle_link_down %s", link)
693 1
        switch_flows = {}
694 1
        evcs_with_failover = []
695 1
        evcs_normal = []
696 1
        check_failover = []
697 1
        for evc in self.get_evcs_by_svc_level():
698 1
            if evc.is_affected_by_link(link):
699
                # if there is no failover path, handles link down the
700
                # tradditional way
701 1
                if (
702
                    not getattr(evc, 'failover_path', None) or
703
                    evc.is_failover_path_affected_by_link(link)
704
                ):
705 1
                    evcs_normal.append(evc)
706 1
                    continue
707 1
                for dpid, flows in evc.get_failover_flows().items():
708 1
                    switch_flows.setdefault(dpid, [])
709 1
                    switch_flows[dpid].extend(flows)
710 1
                evcs_with_failover.append(evc)
711
            else:
712 1
                check_failover.append(evc)
713
714 1
        offset = 0
715 1
        while switch_flows:
716 1
            offset = (offset + settings.BATCH_SIZE) or None
717 1
            switches = list(switch_flows.keys())
718 1
            for dpid in switches:
719 1
                emit_event(
720
                    self.controller,
721
                    context="kytos.flow_manager",
722
                    name="flows.install",
723
                    content={
724
                        "dpid": dpid,
725
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
726
                    }
727
                )
728 1
                if offset is None or offset >= len(switch_flows[dpid]):
729 1
                    del switch_flows[dpid]
730 1
                    continue
731 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
732 1
            time.sleep(settings.BATCH_INTERVAL)
733
734 1
        for evc in evcs_with_failover:
735 1
            with evc.lock:
736 1
                old_path = evc.current_path
737 1
                evc.current_path = evc.failover_path
738 1
                evc.failover_path = old_path
739 1
                evc.sync()
740 1
            emit_event(self.controller, "redeployed_link_down",
741
                       content=map_evc_event_content(evc))
742 1
            log.info(
743
                f"{evc} redeployed with failover due to link down {link.id}"
744
            )
745
746 1
        for evc in evcs_normal:
747 1
            emit_event(
748
                self.controller,
749
                "evc_affected_by_link_down",
750
                content={"link_id": link.id} | map_evc_event_content(evc)
751
            )
752
753
        # After handling the hot path, check if new failover paths are needed.
754
        # Note that EVCs affected by link down will generate a KytosEvent for
755
        # deployed|redeployed, which will trigger the failover path setup.
756
        # Thus, we just need to further check the check_failover list
757 1
        for evc in check_failover:
758 1
            if evc.is_failover_path_affected_by_link(link):
759 1
                evc.setup_failover_path()
760
761 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
762 1
    def on_evc_affected_by_link_down(self, event):
763
        """Change circuit when link is down or under_mantenance."""
764
        self.handle_evc_affected_by_link_down(event)
765
766 1
    def handle_evc_affected_by_link_down(self, event):
767
        """Change circuit when link is down or under_mantenance."""
768 1
        evc = self.circuits.get(event.content["evc_id"])
769 1
        link_id = event.content['link_id']
770 1
        if not evc:
771 1
            return
772 1
        with evc.lock:
773 1
            result = evc.handle_link_down()
774 1
        event_name = "error_redeploy_link_down"
775 1
        if result:
776 1
            log.info(f"{evc} redeployed due to link down {link_id}")
777 1
            event_name = "redeployed_link_down"
778 1
        emit_event(self.controller, event_name,
779
                   content=map_evc_event_content(evc))
780
781 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
782 1
    def on_evc_deployed(self, event):
783
        """Handle EVC deployed|redeployed_link_down."""
784
        self.handle_evc_deployed(event)
785
786 1
    def handle_evc_deployed(self, event):
787
        """Setup failover path on evc deployed."""
788 1
        evc = self.circuits.get(event.content["evc_id"])
789 1
        if not evc:
790 1
            return
791 1
        with evc.lock:
792 1
            evc.setup_failover_path()
793
794 1
    @listen_to("kytos/topology.topology_loaded")
795 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
796
        """Load EVCs once the topology is available."""
797
        self.load_all_evcs()
798
799 1
    def load_all_evcs(self):
800
        """Try to load all EVCs on startup."""
801 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
802 1
        for circuit_id, circuit in circuits:
803 1
            if circuit_id not in self.circuits:
804 1
                self._load_evc(circuit)
805
806 1
    def _load_evc(self, circuit_dict):
807
        """Load one EVC from mongodb to memory."""
808 1
        try:
809 1
            evc = self._evc_from_dict(circuit_dict)
810 1
        except ValueError as exception:
811 1
            log.error(
812
                f"Could not load EVC: dict={circuit_dict} error={exception}"
813
            )
814 1
            return None
815
816 1
        if evc.archived:
817 1
            return None
818 1
        evc.deactivate()
819 1
        evc.sync()
820 1
        self.circuits.setdefault(evc.id, evc)
821 1
        self.sched.add(evc)
822 1
        return evc
823
824 1
    @listen_to("kytos/flow_manager.flow.error")
825 1
    def on_flow_mod_error(self, event):
826
        """Handle flow mod errors related to an EVC."""
827
        self.handle_flow_mod_error(event)
828
829 1
    def handle_flow_mod_error(self, event):
830
        """Handle flow mod errors related to an EVC."""
831 1
        flow = event.content["flow"]
832 1
        command = event.content.get("error_command")
833 1
        if command != "add":
834
            return
835 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
836 1
        if evc:
837 1
            evc.remove_current_flows()
838
839 1
    def _evc_dict_with_instances(self, evc_dict):
840
        """Convert some dict values to instance of EVC classes.
841
842
        This method will convert: [UNI, Link]
843
        """
844 1
        data = evc_dict.copy()  # Do not modify the original dict
845 1
        for attribute, value in data.items():
846
            # Get multiple attributes.
847
            # Ex: uni_a, uni_z
848 1
            if "uni" in attribute:
849 1
                try:
850 1
                    data[attribute] = self._uni_from_dict(value)
851 1
                except ValueError as exception:
852 1
                    result = "Error creating UNI: Invalid value"
853 1
                    raise ValueError(result) from exception
854
855 1
            if attribute == "circuit_scheduler":
856 1
                data[attribute] = []
857 1
                for schedule in value:
858 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
859
860
            # Get multiple attributes.
861
            # Ex: primary_links,
862
            #     backup_links,
863
            #     current_links_cache,
864
            #     primary_links_cache,
865
            #     backup_links_cache
866 1
            if "links" in attribute:
867 1
                data[attribute] = [
868
                    self._link_from_dict(link) for link in value
869
                ]
870
871
            # Ex: current_path,
872
            #     primary_path,
873
            #     backup_path
874 1
            if "path" in attribute and attribute != "dynamic_backup_path":
875 1
                data[attribute] = Path(
876
                    [self._link_from_dict(link) for link in value]
877
                )
878
879 1
        return data
880
881 1
    def _evc_from_dict(self, evc_dict):
882 1
        data = self._evc_dict_with_instances(evc_dict)
883 1
        data["table_group"] = self.table_group
884 1
        return EVC(self.controller, **data)
885
886 1
    def _uni_from_dict(self, uni_dict):
887
        """Return a UNI object from python dict."""
888 1
        if uni_dict is None:
889 1
            return False
890
891 1
        interface_id = uni_dict.get("interface_id")
892 1
        interface = self.controller.get_interface_by_id(interface_id)
893 1
        if interface is None:
894 1
            result = (
895
                "Error creating UNI:"
896
                + f"Could not instantiate interface {interface_id}"
897
            )
898 1
            raise ValueError(result) from ValueError
899
900 1
        tag_dict = uni_dict.get("tag", None)
901 1
        if tag_dict:
902 1
            tag = TAG.from_dict(tag_dict)
903
        else:
904 1
            tag = None
905 1
        uni = UNI(interface, tag)
906
907 1
        return uni
908
909 1
    def _link_from_dict(self, link_dict):
910
        """Return a Link object from python dict."""
911 1
        id_a = link_dict.get("endpoint_a").get("id")
912 1
        id_b = link_dict.get("endpoint_b").get("id")
913
914 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
915 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
916 1
        if not endpoint_a:
917 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
918 1
            raise ValueError(error_msg)
919 1
        if not endpoint_b:
920
            error_msg = f"Could not get interface endpoint_b id {id_b}"
921
            raise ValueError(error_msg)
922
923 1
        link = Link(endpoint_a, endpoint_b)
924 1
        if "metadata" in link_dict:
925 1
            link.extend_metadata(link_dict.get("metadata"))
926
927 1
        s_vlan = link.get_metadata("s_vlan")
928 1
        if s_vlan:
929 1
            tag = TAG.from_dict(s_vlan)
930 1
            if tag is False:
931
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
932
                raise ValueError(error_msg)
933 1
            link.update_metadata("s_vlan", tag)
934 1
        return link
935
936 1
    def _find_evc_by_schedule_id(self, schedule_id):
937
        """
938
        Find an EVC and CircuitSchedule based on schedule_id.
939
940
        :param schedule_id: Schedule ID
941
        :return: EVC and Schedule
942
        """
943 1
        circuits = self._get_circuits_buffer()
944 1
        found_schedule = None
945 1
        evc = None
946
947
        # pylint: disable=unused-variable
948 1
        for c_id, circuit in circuits.items():
949 1
            for schedule in circuit.circuit_scheduler:
950 1
                if schedule.id == schedule_id:
951 1
                    found_schedule = schedule
952 1
                    evc = circuit
953 1
                    break
954 1
            if found_schedule:
955 1
                break
956 1
        return evc, found_schedule
957
958 1
    def _get_circuits_buffer(self):
959
        """
960
        Return the circuit buffer.
961
962
        If the buffer is empty, try to load data from mongodb.
963
        """
964 1
        if not self.circuits:
965
            # Load circuits from mongodb to buffer
966 1
            circuits = self.mongo_controller.get_circuits()['circuits']
967 1
            for c_id, circuit in circuits.items():
968 1
                evc = self._evc_from_dict(circuit)
969 1
                self.circuits[c_id] = evc
970 1
        return self.circuits
971
972
    # pylint: disable=attribute-defined-outside-init
973 1
    @alisten_to("kytos/of_multi_table.enable_table")
974 1
    async def on_table_enabled(self, event):
975
        """Handle a recently table enabled."""
976 1
        table_group = event.content.get("mef_eline", None)
977 1
        if not table_group:
978
            return
979 1
        for group in table_group:
980 1
            if group not in settings.TABLE_GROUP_ALLOWED:
981 1
                log.error(f'The table group "{group}" is not allowed for '
982
                          f'mef_eline. Allowed table groups are '
983
                          f'{settings.TABLE_GROUP_ALLOWED}')
984 1
                return
985 1
            self.table_group[group] = table_group[group]
986 1
        content = {"group_table": self.table_group}
987 1
        name = "kytos/mef_eline.enable_table"
988
        await aemit_event(self.controller, name, content)
989