Passed
Pull Request — master (#205)
by Vinicius
03:25
created

build.main   F

Complexity

Total Complexity 161

Size/Duplication

Total Lines 951
Duplicated Lines 0 %

Test Coverage

Coverage 92.41%

Importance

Changes 0
Metric Value
wmc 161
eloc 604
dl 0
loc 951
rs 1.996
c 0
b 0
f 0
ccs 511
cts 553
cp 0.9241

40 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.setup() 0 27 1
A Main.list_circuits() 0 23 3
A Main.get_circuit() 0 12 2
A Main.shutdown() 0 2 1
A Main.execute() 0 11 4
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
D Main.create_circuit() 0 98 12
C Main.execute_consistency() 0 24 10
D Main.update() 0 57 13
B Main.add_metadata() 0 30 6
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.handle_evc_deployed() 0 7 3
A Main.update_schedule() 0 50 3
A Main._evc_from_dict() 0 3 1
A Main._find_evc_by_schedule_id() 0 21 5
A Main.load_all_evcs() 0 6 3
A Main.get_metadata() 0 10 2
A Main.delete_metadata() 0 11 2
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 35 3
A Main.handle_evc_affected_by_link_down() 0 13 4
F Main.handle_link_down() 0 69 15
A Main.on_evc_deployed() 0 4 1
A Main._json_from_request() 0 23 4
B Main.list_schedules() 0 31 5
A Main._link_from_dict() 0 20 4
A Main._uni_from_dict() 0 22 4
A Main.redeploy() 0 20 4
A Main.handle_flow_mod_error() 0 9 3
A Main._is_duplicated_evc() 0 14 4
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main._load_evc() 0 17 3
B Main.create_schedule() 0 76 7
A Main._get_circuits_buffer() 0 13 3
A Main.delete_circuit() 0 36 4

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
            ):
99 1
                if circuit.check_traces():
100 1
                    log.info(f"{circuit} enabled but inactive - activating")
101 1
                    with circuit.lock:
102 1
                        circuit.activate()
103 1
                        circuit.sync()
104
                else:
105 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
106 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
107 1
                        with circuit.lock:
108 1
                            circuit.deploy()
109 1
        for circuit_id in stored_circuits:
110 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
111 1
            self._load_evc(stored_circuits[circuit_id])
112
113 1
    def shutdown(self):
114
        """Execute when your napp is unloaded.
115
116
        If you have some cleanup procedure, insert it here.
117
        """
118
119 1
    @rest("/v2/evc/", methods=["GET"])
120 1
    def list_circuits(self):
121
        """Endpoint to return circuits stored.
122
123
        If archived is set to True return all circuits, else only the ones
124
        not archived.
125
        """
126 1
        log.debug("list_circuits /v2/evc")
127 1
        archived = request.args.get("archived", False)
128 1
        circuits = self.mongo_controller.get_circuits()['circuits']
129 1
        if not circuits:
130 1
            return jsonify({}), 200
131 1
        if archived:
132 1
            return jsonify(circuits), 200
133 1
        return (
134
            jsonify(
135
                {
136
                    circuit_id: circuit
137
                    for circuit_id, circuit in circuits.items()
138
                    if not circuit.get("archived", False)
139
                }
140
            ),
141
            200,
142
        )
143
144 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
145 1
    def get_circuit(self, circuit_id):
146
        """Endpoint to return a circuit based on id."""
147 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
148 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
149 1
        if not circuit:
150 1
            result = f"circuit_id {circuit_id} not found"
151 1
            log.debug("get_circuit result %s %s", result, 404)
152 1
            raise NotFound(result)
153 1
        status = 200
154 1
        log.debug("get_circuit result %s %s", circuit, status)
155 1
        return jsonify(circuit), status
156
157 1
    @rest("/v2/evc/", methods=["POST"])
158 1
    @validate(spec)
159 1
    def create_circuit(self, data):
160
        """Try to create a new circuit.
161
162
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
163
        UNI_Z's requested C-VID are available from the interfaces' pools. This
164
        is checked when creating the UNI object.
165
166
        Then, E-Line NApp requests a primary and a backup path to the
167
        Pathfinder NApp using the attributes primary_links and backup_links
168
        submitted via REST
169
170
        # For each link composing paths in #3:
171
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
172
        #  - Using the S-VID obtained, generate abstract flow entries to be
173
        #    sent to FlowManager
174
175
        Push abstract flow entries to FlowManager and FlowManager pushes
176
        OpenFlow entries to datapaths
177
178
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
179
        creation
180
181
        Finnaly, notify user of the status of its request.
182
        """
183
        # Try to create the circuit object
184 1
        log.debug("create_circuit /v2/evc/")
185
186 1
        try:
187 1
            evc = self._evc_from_dict(data)
188 1
        except ValueError as exception:
189 1
            log.debug("create_circuit result %s %s", exception, 400)
190 1
            raise BadRequest(str(exception)) from BadRequest
191
192 1
        if evc.primary_path:
193
            try:
194
                evc.primary_path.is_valid(
195
                    evc.uni_a.interface.switch,
196
                    evc.uni_z.interface.switch,
197
                    bool(evc.circuit_scheduler),
198
                )
199
            except InvalidPath as exception:
200
                raise BadRequest(
201
                    f"primary_path is not valid: {exception}"
202
                ) from exception
203 1
        if evc.backup_path:
204
            try:
205
                evc.backup_path.is_valid(
206
                    evc.uni_a.interface.switch,
207
                    evc.uni_z.interface.switch,
208
                    bool(evc.circuit_scheduler),
209
                )
210
            except InvalidPath as exception:
211
                raise BadRequest(
212
                    f"backup_path is not valid: {exception}"
213
                ) from exception
214
215
        # verify duplicated evc
216 1
        if self._is_duplicated_evc(evc):
217 1
            result = "The EVC already exists."
218 1
            log.debug("create_circuit result %s %s", result, 409)
219 1
            raise Conflict(result)
220
221 1
        if (
222
            not evc.primary_path
223
            and evc.dynamic_backup_path is False
224
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
225
        ):
226 1
            result = "The EVC must have a primary path or allow dynamic paths."
227 1
            log.debug("create_circuit result %s %s", result, 400)
228 1
            raise BadRequest(result)
229
230
        # store circuit in dictionary
231 1
        self.circuits[evc.id] = evc
232
233
        # save circuit
234 1
        evc.sync()
235
236
        # Schedule the circuit deploy
237 1
        self.sched.add(evc)
238
239
        # Circuit has no schedule, deploy now
240 1
        if not evc.circuit_scheduler:
241 1
            with evc.lock:
242 1
                evc.deploy()
243
244
        # Notify users
245 1
        event = KytosEvent(
246
            name="kytos.mef_eline.created", content=evc.as_dict()
247
        )
248 1
        self.controller.buffers.app.put(event)
249
250 1
        result = {"circuit_id": evc.id}
251 1
        status = 201
252 1
        log.debug("create_circuit result %s %s", result, status)
253 1
        emit_event(self.controller, "created", evc_id=evc.id)
254 1
        return jsonify(result), status
255
256 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
257 1
    def update(self, circuit_id):
258
        """Update a circuit based on payload.
259
260
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
261
        """
262 1
        log.debug("update /v2/evc/%s", circuit_id)
263 1
        try:
264 1
            evc = self.circuits[circuit_id]
265 1
        except KeyError:
266 1
            result = f"circuit_id {circuit_id} not found"
267 1
            log.debug("update result %s %s", result, 404)
268 1
            raise NotFound(result) from NotFound
269
270 1
        if evc.archived:
271 1
            result = "Can't update archived EVC"
272 1
            log.debug("update result %s %s", result, 405)
273 1
            raise MethodNotAllowed(["GET"], result)
274
275 1
        try:
276 1
            data = request.get_json()
277 1
        except BadRequest:
278 1
            result = "The request body is not a well-formed JSON."
279 1
            log.debug("update result %s %s", result, 400)
280 1
            raise BadRequest(result) from BadRequest
281 1
        if data is None:
282 1
            result = "The request body mimetype is not application/json."
283 1
            log.debug("update result %s %s", result, 415)
284 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
285
286 1
        try:
287 1
            enable, redeploy = evc.update(
288
                **self._evc_dict_with_instances(data)
289
            )
290 1
        except ValueError as exception:
291 1
            log.error(exception)
292 1
            log.debug("update result %s %s", exception, 400)
293 1
            raise BadRequest(str(exception)) from BadRequest
294
295 1
        if evc.is_active():
296
            if enable is False:  # disable if active
297
                with evc.lock:
298
                    evc.remove()
299
            elif redeploy is not None:  # redeploy if active
300
                with evc.lock:
301
                    evc.remove()
302
                    evc.deploy()
303
        else:
304 1
            if enable is True:  # enable if inactive
305 1
                with evc.lock:
306 1
                    evc.deploy()
307 1
        result = {evc.id: evc.as_dict()}
308 1
        status = 200
309
310 1
        log.debug("update result %s %s", result, status)
311 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
312 1
        return jsonify(result), status
313
314 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
315 1
    def delete_circuit(self, circuit_id):
316
        """Remove a circuit.
317
318
        First, the flows are removed from the switches, and then the EVC is
319
        disabled.
320
        """
321 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
322 1
        try:
323 1
            evc = self.circuits[circuit_id]
324 1
        except KeyError:
325 1
            result = f"circuit_id {circuit_id} not found"
326 1
            log.debug("delete_circuit result %s %s", result, 404)
327 1
            raise NotFound(result) from NotFound
328
329 1
        if evc.archived:
330 1
            result = f"Circuit {circuit_id} already removed"
331 1
            log.debug("delete_circuit result %s %s", result, 404)
332 1
            raise NotFound(result) from NotFound
333
334 1
        log.info("Removing %s", evc)
335 1
        with evc.lock:
336 1
            evc.remove_current_flows()
337 1
            evc.remove_failover_flows(sync=False)
338 1
            evc.deactivate()
339 1
            evc.disable()
340 1
            self.sched.remove(evc)
341 1
            evc.archive()
342 1
            evc.sync()
343 1
        log.info("EVC removed. %s", evc)
344 1
        result = {"response": f"Circuit {circuit_id} removed"}
345 1
        status = 200
346
347 1
        log.debug("delete_circuit result %s %s", result, status)
348 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
349 1
        return jsonify(result), status
350
351 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
352 1
    def get_metadata(self, circuit_id):
353
        """Get metadata from an EVC."""
354 1
        try:
355 1
            return (
356
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
357
                200,
358
            )
359
        except KeyError as error:
360
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
361
362 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
363 1
    def add_metadata(self, circuit_id):
364
        """Add metadata to an EVC."""
365 1
        try:
366 1
            metadata = request.get_json()
367 1
            content_type = request.content_type
368 1
        except BadRequest as error:
369 1
            result = "The request body is not a well-formed JSON."
370 1
            raise BadRequest(result) from error
371 1
        if content_type is None:
372 1
            result = "The request body is empty."
373 1
            raise BadRequest(result)
374 1
        if metadata is None:
375 1
            if content_type != "application/json":
376 1
                result = (
377
                    "The content type must be application/json "
378
                    f"(received {content_type})."
379
                )
380
            else:
381
                result = "Metadata is empty."
382 1
            raise UnsupportedMediaType(result)
383
384 1
        try:
385 1
            evc = self.circuits[circuit_id]
386 1
        except KeyError as error:
387 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
388
389 1
        evc.extend_metadata(metadata)
390 1
        evc.sync()
391 1
        return jsonify("Operation successful"), 201
392
393 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
394 1
    def delete_metadata(self, circuit_id, key):
395
        """Delete metadata from an EVC."""
396 1
        try:
397 1
            evc = self.circuits[circuit_id]
398 1
        except KeyError as error:
399 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
400
401 1
        evc.remove_metadata(key)
402 1
        evc.sync()
403 1
        return jsonify("Operation successful"), 200
404
405 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
406 1
    def redeploy(self, circuit_id):
407
        """Endpoint to force the redeployment of an EVC."""
408 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
409 1
        try:
410 1
            evc = self.circuits[circuit_id]
411 1
        except KeyError:
412 1
            result = f"circuit_id {circuit_id} not found"
413 1
            raise NotFound(result) from NotFound
414 1
        if evc.is_enabled():
415 1
            with evc.lock:
416 1
                evc.remove_current_flows()
417 1
                evc.deploy()
418 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
419 1
            status = 202
420
        else:
421 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
422 1
            status = 409
423
424 1
        return jsonify(result), status
425
426 1
    @rest("/v2/evc/schedule", methods=["GET"])
427 1
    def list_schedules(self):
428
        """Endpoint to return all schedules stored for all circuits.
429
430
        Return a JSON with the following template:
431
        [{"schedule_id": <schedule_id>,
432
         "circuit_id": <circuit_id>,
433
         "schedule": <schedule object>}]
434
        """
435 1
        log.debug("list_schedules /v2/evc/schedule")
436 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
437 1
        if not circuits:
438 1
            result = {}
439 1
            status = 200
440 1
            return jsonify(result), status
441
442 1
        result = []
443 1
        status = 200
444 1
        for circuit in circuits:
445 1
            circuit_scheduler = circuit.get("circuit_scheduler")
446 1
            if circuit_scheduler:
447 1
                for scheduler in circuit_scheduler:
448 1
                    value = {
449
                        "schedule_id": scheduler.get("id"),
450
                        "circuit_id": circuit.get("id"),
451
                        "schedule": scheduler,
452
                    }
453 1
                    result.append(value)
454
455 1
        log.debug("list_schedules result %s %s", result, status)
456 1
        return jsonify(result), status
457
458 1
    @rest("/v2/evc/schedule/", methods=["POST"])
459 1
    def create_schedule(self):
460
        """
461
        Create a new schedule for a given circuit.
462
463
        This service do no check if there are conflicts with another schedule.
464
        Payload example:
465
            {
466
              "circuit_id":"aa:bb:cc",
467
              "schedule": {
468
                "date": "2019-08-07T14:52:10.967Z",
469
                "interval": "string",
470
                "frequency": "1 * * * *",
471
                "action": "create"
472
              }
473
            }
474
        """
475 1
        log.debug("create_schedule /v2/evc/schedule/")
476
477 1
        json_data = self._json_from_request("create_schedule")
478 1
        try:
479 1
            circuit_id = json_data["circuit_id"]
480 1
        except TypeError:
481 1
            result = "The payload should have a dictionary."
482 1
            log.debug("create_schedule result %s %s", result, 400)
483 1
            raise BadRequest(result) from BadRequest
484 1
        except KeyError:
485 1
            result = "Missing circuit_id."
486 1
            log.debug("create_schedule result %s %s", result, 400)
487 1
            raise BadRequest(result) from BadRequest
488
489 1
        try:
490 1
            schedule_data = json_data["schedule"]
491 1
        except KeyError:
492 1
            result = "Missing schedule data."
493 1
            log.debug("create_schedule result %s %s", result, 400)
494 1
            raise BadRequest(result) from BadRequest
495
496
        # Get EVC from circuits buffer
497 1
        circuits = self._get_circuits_buffer()
498
499
        # get the circuit
500 1
        evc = circuits.get(circuit_id)
501
502
        # get the circuit
503 1
        if not evc:
504 1
            result = f"circuit_id {circuit_id} not found"
505 1
            log.debug("create_schedule result %s %s", result, 404)
506 1
            raise NotFound(result) from NotFound
507
        # Can not modify circuits deleted and archived
508 1
        if evc.archived:
509 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
510 1
            log.debug("create_schedule result %s %s", result, 403)
511 1
            raise Forbidden(result) from Forbidden
512
513
        # new schedule from dict
514 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
515
516
        # If there is no schedule, create the list
517 1
        if not evc.circuit_scheduler:
518 1
            evc.circuit_scheduler = []
519
520
        # Add the new schedule
521 1
        evc.circuit_scheduler.append(new_schedule)
522
523
        # Add schedule job
524 1
        self.sched.add_circuit_job(evc, new_schedule)
525
526
        # save circuit to mongodb
527 1
        evc.sync()
528
529 1
        result = new_schedule.as_dict()
530 1
        status = 201
531
532 1
        log.debug("create_schedule result %s %s", result, status)
533 1
        return jsonify(result), status
534
535 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
536 1
    def update_schedule(self, schedule_id):
537
        """Update a schedule.
538
539
        Change all attributes from the given schedule from a EVC circuit.
540
        The schedule ID is preserved as default.
541
        Payload example:
542
            {
543
              "date": "2019-08-07T14:52:10.967Z",
544
              "interval": "string",
545
              "frequency": "1 * * *",
546
              "action": "create"
547
            }
548
        """
549 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
550
551
        # Try to find a circuit schedule
552 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
553
554
        # Can not modify circuits deleted and archived
555 1
        if not found_schedule:
556 1
            result = f"schedule_id {schedule_id} not found"
557 1
            log.debug("update_schedule result %s %s", result, 404)
558 1
            raise NotFound(result) from NotFound
559 1
        if evc.archived:
560 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
561 1
            log.debug("update_schedule result %s %s", result, 403)
562 1
            raise Forbidden(result) from Forbidden
563
564 1
        data = self._json_from_request("update_schedule")
565
566 1
        new_schedule = CircuitSchedule.from_dict(data)
567 1
        new_schedule.id = found_schedule.id
568
        # Remove the old schedule
569 1
        evc.circuit_scheduler.remove(found_schedule)
570
        # Append the modified schedule
571 1
        evc.circuit_scheduler.append(new_schedule)
572
573
        # Cancel all schedule jobs
574 1
        self.sched.cancel_job(found_schedule.id)
575
        # Add the new circuit schedule
576 1
        self.sched.add_circuit_job(evc, new_schedule)
577
        # Save EVC to mongodb
578 1
        evc.sync()
579
580 1
        result = new_schedule.as_dict()
581 1
        status = 200
582
583 1
        log.debug("update_schedule result %s %s", result, status)
584 1
        return jsonify(result), status
585
586 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
587 1
    def delete_schedule(self, schedule_id):
588
        """Remove a circuit schedule.
589
590
        Remove the Schedule from EVC.
591
        Remove the Schedule from cron job.
592
        Save the EVC to the Storehouse.
593
        """
594 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
595 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
596
597
        # Can not modify circuits deleted and archived
598 1
        if not found_schedule:
599 1
            result = f"schedule_id {schedule_id} not found"
600 1
            log.debug("delete_schedule result %s %s", result, 404)
601 1
            raise NotFound(result)
602
603 1
        if evc.archived:
604 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
605 1
            log.debug("delete_schedule result %s %s", result, 403)
606 1
            raise Forbidden(result)
607
608
        # Remove the old schedule
609 1
        evc.circuit_scheduler.remove(found_schedule)
610
611
        # Cancel all schedule jobs
612 1
        self.sched.cancel_job(found_schedule.id)
613
        # Save EVC to mongodb
614 1
        evc.sync()
615
616 1
        result = "Schedule removed"
617 1
        status = 200
618
619 1
        log.debug("delete_schedule result %s %s", result, status)
620 1
        return jsonify(result), status
621
622 1
    def _is_duplicated_evc(self, evc):
623
        """Verify if the circuit given is duplicated with the stored evcs.
624
625
        Args:
626
            evc (EVC): circuit to be analysed.
627
628
        Returns:
629
            boolean: True if the circuit is duplicated, otherwise False.
630
631
        """
632 1
        for circuit in tuple(self.circuits.values()):
633 1
            if not circuit.archived and circuit.shares_uni(evc):
634 1
                return True
635 1
        return False
636
637 1
    @listen_to("kytos/topology.link_up")
638 1
    def on_link_up(self, event):
639
        """Change circuit when link is up or end_maintenance."""
640
        self.handle_link_up(event)
641
642 1
    def handle_link_up(self, event):
643
        """Change circuit when link is up or end_maintenance."""
644 1
        log.debug("Event handle_link_up %s", event)
645 1
        for evc in self.get_evcs_by_svc_level():
646 1
            if evc.is_enabled() and not evc.archived:
647 1
                with evc.lock:
648 1
                    evc.handle_link_up(event.content["link"])
649
650 1
    @listen_to("kytos/topology.link_down")
651 1
    def on_link_down(self, event):
652
        """Change circuit when link is down or under_mantenance."""
653
        self.handle_link_down(event)
654
655 1
    def handle_link_down(self, event):
656
        """Change circuit when link is down or under_mantenance."""
657 1
        link = event.content["link"]
658 1
        log.info("Event handle_link_down %s", link)
659 1
        switch_flows = {}
660 1
        evcs_with_failover = []
661 1
        evcs_normal = []
662 1
        check_failover = []
663 1
        for evc in self.get_evcs_by_svc_level():
664 1
            if evc.is_affected_by_link(link):
665
                # if there is no failover path, handles link down the
666
                # tradditional way
667 1
                if (
668
                    not getattr(evc, 'failover_path', None) or
669
                    evc.is_failover_path_affected_by_link(link)
670
                ):
671 1
                    evcs_normal.append(evc)
672 1
                    continue
673 1
                for dpid, flows in evc.get_failover_flows().items():
674 1
                    switch_flows.setdefault(dpid, [])
675 1
                    switch_flows[dpid].extend(flows)
676 1
                evcs_with_failover.append(evc)
677
            else:
678 1
                check_failover.append(evc)
679
680 1
        offset = 0
681 1
        while switch_flows:
682 1
            offset = (offset + settings.BATCH_SIZE) or None
683 1
            switches = list(switch_flows.keys())
684 1
            for dpid in switches:
685 1
                emit_event(
686
                    self.controller,
687
                    context="kytos.flow_manager",
688
                    name="flows.install",
689
                    dpid=dpid,
690
                    flow_dict={"flows": switch_flows[dpid][:offset]},
691
                )
692 1
                if offset is None or offset >= len(switch_flows[dpid]):
693 1
                    del switch_flows[dpid]
694 1
                    continue
695 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
696 1
            time.sleep(settings.BATCH_INTERVAL)
697
698 1
        for evc in evcs_with_failover:
699 1
            with evc.lock:
700 1
                old_path = evc.current_path
701 1
                evc.current_path = evc.failover_path
702 1
                evc.failover_path = old_path
703 1
                evc.sync()
704 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
705 1
            log.info(
706
                f"{evc} redeployed with failover due to link down {link.id}"
707
            )
708
709 1
        for evc in evcs_normal:
710 1
            emit_event(
711
                self.controller,
712
                "evc_affected_by_link_down",
713
                evc_id=evc.id,
714
                link_id=link.id,
715
            )
716
717
        # After handling the hot path, check if new failover paths are needed.
718
        # Note that EVCs affected by link down will generate a KytosEvent for
719
        # deployed|redeployed, which will trigger the failover path setup.
720
        # Thus, we just need to further check the check_failover list
721 1
        for evc in check_failover:
722 1
            if evc.is_failover_path_affected_by_link(link):
723 1
                evc.setup_failover_path()
724
725 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
726 1
    def on_evc_affected_by_link_down(self, event):
727
        """Change circuit when link is down or under_mantenance."""
728
        self.handle_evc_affected_by_link_down(event)
729
730 1
    def handle_evc_affected_by_link_down(self, event):
731
        """Change circuit when link is down or under_mantenance."""
732 1
        evc = self.circuits.get(event.content["evc_id"])
733 1
        link_id = event.content['link_id']
734 1
        if not evc:
735 1
            return
736 1
        with evc.lock:
737 1
            result = evc.handle_link_down()
738 1
        event_name = "error_redeploy_link_down"
739 1
        if result:
740 1
            log.info(f"{evc} redeployed due to link down {link_id}")
741 1
            event_name = "redeployed_link_down"
742 1
        emit_event(self.controller, event_name, evc_id=evc.id)
743
744 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
745 1
    def on_evc_deployed(self, event):
746
        """Handle EVC deployed|redeployed_link_down."""
747
        self.handle_evc_deployed(event)
748
749 1
    def handle_evc_deployed(self, event):
750
        """Setup failover path on evc deployed."""
751 1
        evc = self.circuits.get(event.content["evc_id"])
752 1
        if not evc:
753 1
            return
754 1
        with evc.lock:
755 1
            evc.setup_failover_path()
756
757 1
    @listen_to("kytos/topology.topology_loaded")
758 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
759
        """Load EVCs once the topology is available."""
760
        self.load_all_evcs()
761
762 1
    def load_all_evcs(self):
763
        """Try to load all EVCs on startup."""
764 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
765 1
        for circuit_id, circuit in circuits:
766 1
            if circuit_id not in self.circuits:
767 1
                self._load_evc(circuit)
768
769 1
    def _load_evc(self, circuit_dict):
770
        """Load one EVC from mongodb to memory."""
771 1
        try:
772 1
            evc = self._evc_from_dict(circuit_dict)
773 1
        except ValueError as exception:
774 1
            log.error(
775
                f"Could not load EVC: dict={circuit_dict} error={exception}"
776
            )
777 1
            return None
778
779 1
        if evc.archived:
780 1
            return None
781 1
        evc.deactivate()
782 1
        evc.sync()
783 1
        self.circuits.setdefault(evc.id, evc)
784 1
        self.sched.add(evc)
785 1
        return evc
786
787 1
    @listen_to("kytos/flow_manager.flow.error")
788 1
    def on_flow_mod_error(self, event):
789
        """Handle flow mod errors related to an EVC."""
790
        self.handle_flow_mod_error(event)
791
792 1
    def handle_flow_mod_error(self, event):
793
        """Handle flow mod errors related to an EVC."""
794 1
        flow = event.content["flow"]
795 1
        command = event.content.get("error_command")
796 1
        if command != "add":
797
            return
798 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
799 1
        if evc:
800 1
            evc.remove_current_flows()
801
802 1
    def _evc_dict_with_instances(self, evc_dict):
803
        """Convert some dict values to instance of EVC classes.
804
805
        This method will convert: [UNI, Link]
806
        """
807 1
        data = evc_dict.copy()  # Do not modify the original dict
808 1
        for attribute, value in data.items():
809
            # Get multiple attributes.
810
            # Ex: uni_a, uni_z
811 1
            if "uni" in attribute:
812 1
                try:
813 1
                    data[attribute] = self._uni_from_dict(value)
814 1
                except ValueError as exception:
815 1
                    result = "Error creating UNI: Invalid value"
816 1
                    raise ValueError(result) from exception
817
818 1
            if attribute == "circuit_scheduler":
819 1
                data[attribute] = []
820 1
                for schedule in value:
821 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
822
823
            # Get multiple attributes.
824
            # Ex: primary_links,
825
            #     backup_links,
826
            #     current_links_cache,
827
            #     primary_links_cache,
828
            #     backup_links_cache
829 1
            if "links" in attribute:
830 1
                data[attribute] = [
831
                    self._link_from_dict(link) for link in value
832
                ]
833
834
            # Ex: current_path,
835
            #     primary_path,
836
            #     backup_path
837 1
            if "path" in attribute and attribute != "dynamic_backup_path":
838 1
                data[attribute] = Path(
839
                    [self._link_from_dict(link) for link in value]
840
                )
841
842 1
        return data
843
844 1
    def _evc_from_dict(self, evc_dict):
845 1
        data = self._evc_dict_with_instances(evc_dict)
846 1
        return EVC(self.controller, **data)
847
848 1
    def _uni_from_dict(self, uni_dict):
849
        """Return a UNI object from python dict."""
850 1
        if uni_dict is None:
851 1
            return False
852
853 1
        interface_id = uni_dict.get("interface_id")
854 1
        interface = self.controller.get_interface_by_id(interface_id)
855 1
        if interface is None:
856 1
            result = (
857
                "Error creating UNI:"
858
                + f"Could not instantiate interface {interface_id}"
859
            )
860 1
            raise ValueError(result) from ValueError
861
862 1
        tag_dict = uni_dict.get("tag", None)
863 1
        if tag_dict:
864 1
            tag = TAG.from_dict(tag_dict)
865
        else:
866 1
            tag = None
867 1
        uni = UNI(interface, tag)
868
869 1
        return uni
870
871 1
    def _link_from_dict(self, link_dict):
872
        """Return a Link object from python dict."""
873 1
        id_a = link_dict.get("endpoint_a").get("id")
874 1
        id_b = link_dict.get("endpoint_b").get("id")
875
876 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
877 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
878
879 1
        link = Link(endpoint_a, endpoint_b)
880 1
        if "metadata" in link_dict:
881
            link.extend_metadata(link_dict.get("metadata"))
882
883 1
        s_vlan = link.get_metadata("s_vlan")
884 1
        if s_vlan:
885
            tag = TAG.from_dict(s_vlan)
886
            if tag is False:
887
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
888
                raise ValueError(error_msg)
889
            link.update_metadata("s_vlan", tag)
890 1
        return link
891
892 1
    def _find_evc_by_schedule_id(self, schedule_id):
893
        """
894
        Find an EVC and CircuitSchedule based on schedule_id.
895
896
        :param schedule_id: Schedule ID
897
        :return: EVC and Schedule
898
        """
899 1
        circuits = self._get_circuits_buffer()
900 1
        found_schedule = None
901 1
        evc = None
902
903
        # pylint: disable=unused-variable
904 1
        for c_id, circuit in circuits.items():
905 1
            for schedule in circuit.circuit_scheduler:
906 1
                if schedule.id == schedule_id:
907 1
                    found_schedule = schedule
908 1
                    evc = circuit
909 1
                    break
910 1
            if found_schedule:
911 1
                break
912 1
        return evc, found_schedule
913
914 1
    def _get_circuits_buffer(self):
915
        """
916
        Return the circuit buffer.
917
918
        If the buffer is empty, try to load data from mongodb.
919
        """
920 1
        if not self.circuits:
921
            # Load circuits from mongodb to buffer
922 1
            circuits = self.mongo_controller.get_circuits()['circuits']
923 1
            for c_id, circuit in circuits.items():
924 1
                evc = self._evc_from_dict(circuit)
925 1
                self.circuits[c_id] = evc
926 1
        return self.circuits
927
928 1
    @staticmethod
929 1
    def _json_from_request(caller):
930
        """Return a json from request.
931
932
        If it was not possible to get a json from the request, log, for debug,
933
        who was the caller and the error that ocurred, and raise an
934
        Exception.
935
        """
936 1
        try:
937 1
            json_data = request.get_json()
938
        except ValueError as exception:
939
            log.error(exception)
940
            log.debug(f"{caller} result {exception} 400")
941
            raise BadRequest(str(exception)) from BadRequest
942
        except BadRequest:
943
            result = "The request is not a valid JSON."
944
            log.debug(f"{caller} result {result} 400")
945
            raise BadRequest(result) from BadRequest
946 1
        if json_data is None:
947 1
            result = "Content-Type must be application/json"
948 1
            log.debug(f"{caller} result {result} 415")
949 1
            raise UnsupportedMediaType(result)
950
        return json_data
951