Passed
Pull Request — master (#197)
by Vinicius
03:28
created

build.main   F

Complexity

Total Complexity 161

Size/Duplication

Total Lines 950
Duplicated Lines 0 %

Test Coverage

Coverage 92.39%

Importance

Changes 0
Metric Value
wmc 161
eloc 603
dl 0
loc 950
rs 1.997
c 0
b 0
f 0
ccs 510
cts 552
cp 0.9239

40 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.setup() 0 27 1
A Main.list_circuits() 0 23 3
B Main.add_metadata() 0 30 6
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.handle_evc_deployed() 0 7 3
A Main.update_schedule() 0 50 3
A Main._evc_from_dict() 0 3 1
A Main._find_evc_by_schedule_id() 0 21 5
A Main.load_all_evcs() 0 6 3
A Main.get_metadata() 0 10 2
A Main.delete_metadata() 0 11 2
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 35 3
A Main.get_circuit() 0 12 2
A Main.shutdown() 0 2 1
A Main.handle_evc_affected_by_link_down() 0 13 4
F Main.handle_link_down() 0 69 15
A Main.on_evc_deployed() 0 4 1
A Main.execute() 0 11 4
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
A Main._json_from_request() 0 23 4
B Main.list_schedules() 0 31 5
D Main.create_circuit() 0 98 12
A Main._link_from_dict() 0 20 4
A Main._uni_from_dict() 0 22 4
A Main.redeploy() 0 20 4
A Main.handle_flow_mod_error() 0 9 3
A Main._is_duplicated_evc() 0 14 4
C Main.execute_consistency() 0 24 10
D Main.update() 0 57 13
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main._load_evc() 0 17 3
B Main.create_schedule() 0 76 7
A Main._get_circuits_buffer() 0 13 3
A Main.delete_circuit() 0 35 4

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
            ):
99 1
                if circuit.check_traces():
100 1
                    log.info(f"{circuit} enabled but inactive - activating")
101 1
                    with circuit.lock:
102 1
                        circuit.activate()
103 1
                        circuit.sync()
104
                else:
105 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
106 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
107 1
                        with circuit.lock:
108 1
                            circuit.deploy()
109 1
        for circuit_id in stored_circuits:
110 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
111 1
            self._load_evc(stored_circuits[circuit_id])
112
113 1
    def shutdown(self):
114
        """Execute when your napp is unloaded.
115
116
        If you have some cleanup procedure, insert it here.
117
        """
118
119 1
    @rest("/v2/evc/", methods=["GET"])
120 1
    def list_circuits(self):
121
        """Endpoint to return circuits stored.
122
123
        If archived is set to True return all circuits, else only the ones
124
        not archived.
125
        """
126 1
        log.debug("list_circuits /v2/evc")
127 1
        archived = request.args.get("archived", False)
128 1
        circuits = self.mongo_controller.get_circuits()['circuits']
129 1
        if not circuits:
130 1
            return jsonify({}), 200
131 1
        if archived:
132 1
            return jsonify(circuits), 200
133 1
        return (
134
            jsonify(
135
                {
136
                    circuit_id: circuit
137
                    for circuit_id, circuit in circuits.items()
138
                    if not circuit.get("archived", False)
139
                }
140
            ),
141
            200,
142
        )
143
144 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
145 1
    def get_circuit(self, circuit_id):
146
        """Endpoint to return a circuit based on id."""
147 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
148 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
149 1
        if not circuit:
150 1
            result = f"circuit_id {circuit_id} not found"
151 1
            log.debug("get_circuit result %s %s", result, 404)
152 1
            raise NotFound(result)
153 1
        status = 200
154 1
        log.debug("get_circuit result %s %s", circuit, status)
155 1
        return jsonify(circuit), status
156
157 1
    @rest("/v2/evc/", methods=["POST"])
158 1
    @validate(spec)
159 1
    def create_circuit(self, data):
160
        """Try to create a new circuit.
161
162
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
163
        UNI_Z's requested C-VID are available from the interfaces' pools. This
164
        is checked when creating the UNI object.
165
166
        Then, E-Line NApp requests a primary and a backup path to the
167
        Pathfinder NApp using the attributes primary_links and backup_links
168
        submitted via REST
169
170
        # For each link composing paths in #3:
171
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
172
        #  - Using the S-VID obtained, generate abstract flow entries to be
173
        #    sent to FlowManager
174
175
        Push abstract flow entries to FlowManager and FlowManager pushes
176
        OpenFlow entries to datapaths
177
178
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
179
        creation
180
181
        Finnaly, notify user of the status of its request.
182
        """
183
        # Try to create the circuit object
184 1
        log.debug("create_circuit /v2/evc/")
185
186 1
        try:
187 1
            evc = self._evc_from_dict(data)
188 1
        except ValueError as exception:
189 1
            log.debug("create_circuit result %s %s", exception, 400)
190 1
            raise BadRequest(str(exception)) from BadRequest
191
192 1
        if evc.primary_path:
193
            try:
194
                evc.primary_path.is_valid(
195
                    evc.uni_a.interface.switch,
196
                    evc.uni_z.interface.switch,
197
                    bool(evc.circuit_scheduler),
198
                )
199
            except InvalidPath as exception:
200
                raise BadRequest(
201
                    f"primary_path is not valid: {exception}"
202
                ) from exception
203 1
        if evc.backup_path:
204
            try:
205
                evc.backup_path.is_valid(
206
                    evc.uni_a.interface.switch,
207
                    evc.uni_z.interface.switch,
208
                    bool(evc.circuit_scheduler),
209
                )
210
            except InvalidPath as exception:
211
                raise BadRequest(
212
                    f"backup_path is not valid: {exception}"
213
                ) from exception
214
215
        # verify duplicated evc
216 1
        if self._is_duplicated_evc(evc):
217 1
            result = "The EVC already exists."
218 1
            log.debug("create_circuit result %s %s", result, 409)
219 1
            raise Conflict(result)
220
221 1
        if (
222
            not evc.primary_path
223
            and evc.dynamic_backup_path is False
224
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
225
        ):
226 1
            result = "The EVC must have a primary path or allow dynamic paths."
227 1
            log.debug("create_circuit result %s %s", result, 400)
228 1
            raise BadRequest(result)
229
230
        # store circuit in dictionary
231 1
        self.circuits[evc.id] = evc
232
233
        # save circuit
234 1
        evc.sync()
235
236
        # Schedule the circuit deploy
237 1
        self.sched.add(evc)
238
239
        # Circuit has no schedule, deploy now
240 1
        if not evc.circuit_scheduler:
241 1
            with evc.lock:
242 1
                evc.deploy()
243
244
        # Notify users
245 1
        event = KytosEvent(
246
            name="kytos.mef_eline.created", content=evc.as_dict()
247
        )
248 1
        self.controller.buffers.app.put(event)
249
250 1
        result = {"circuit_id": evc.id}
251 1
        status = 201
252 1
        log.debug("create_circuit result %s %s", result, status)
253 1
        emit_event(self.controller, "created", evc_id=evc.id)
254 1
        return jsonify(result), status
255
256 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
257 1
    def update(self, circuit_id):
258
        """Update a circuit based on payload.
259
260
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
261
        """
262 1
        log.debug("update /v2/evc/%s", circuit_id)
263 1
        try:
264 1
            evc = self.circuits[circuit_id]
265 1
        except KeyError:
266 1
            result = f"circuit_id {circuit_id} not found"
267 1
            log.debug("update result %s %s", result, 404)
268 1
            raise NotFound(result) from NotFound
269
270 1
        if evc.archived:
271 1
            result = "Can't update archived EVC"
272 1
            log.debug("update result %s %s", result, 405)
273 1
            raise MethodNotAllowed(["GET"], result)
274
275 1
        try:
276 1
            data = request.get_json()
277 1
        except BadRequest:
278 1
            result = "The request body is not a well-formed JSON."
279 1
            log.debug("update result %s %s", result, 400)
280 1
            raise BadRequest(result) from BadRequest
281 1
        if data is None:
282 1
            result = "The request body mimetype is not application/json."
283 1
            log.debug("update result %s %s", result, 415)
284 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
285
286 1
        try:
287 1
            enable, redeploy = evc.update(
288
                **self._evc_dict_with_instances(data)
289
            )
290 1
        except ValueError as exception:
291 1
            log.error(exception)
292 1
            log.debug("update result %s %s", exception, 400)
293 1
            raise BadRequest(str(exception)) from BadRequest
294
295 1
        if evc.is_active():
296
            if enable is False:  # disable if active
297
                with evc.lock:
298
                    evc.remove()
299
            elif redeploy is not None:  # redeploy if active
300
                with evc.lock:
301
                    evc.remove()
302
                    evc.deploy()
303
        else:
304 1
            if enable is True:  # enable if inactive
305 1
                with evc.lock:
306 1
                    evc.deploy()
307 1
        result = {evc.id: evc.as_dict()}
308 1
        status = 200
309
310 1
        log.debug("update result %s %s", result, status)
311 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
312 1
        return jsonify(result), status
313
314 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
315 1
    def delete_circuit(self, circuit_id):
316
        """Remove a circuit.
317
318
        First, the flows are removed from the switches, and then the EVC is
319
        disabled.
320
        """
321 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
322 1
        try:
323 1
            evc = self.circuits[circuit_id]
324 1
        except KeyError:
325 1
            result = f"circuit_id {circuit_id} not found"
326 1
            log.debug("delete_circuit result %s %s", result, 404)
327 1
            raise NotFound(result) from NotFound
328
329 1
        if evc.archived:
330 1
            result = f"Circuit {circuit_id} already removed"
331 1
            log.debug("delete_circuit result %s %s", result, 404)
332 1
            raise NotFound(result) from NotFound
333
334 1
        log.info("Removing %s", evc)
335 1
        with evc.lock:
336 1
            evc.remove_current_flows()
337 1
            evc.deactivate()
338 1
            evc.disable()
339 1
            self.sched.remove(evc)
340 1
            evc.archive()
341 1
            evc.sync()
342 1
        log.info("EVC removed. %s", evc)
343 1
        result = {"response": f"Circuit {circuit_id} removed"}
344 1
        status = 200
345
346 1
        log.debug("delete_circuit result %s %s", result, status)
347 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
348 1
        return jsonify(result), status
349
350 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
351 1
    def get_metadata(self, circuit_id):
352
        """Get metadata from an EVC."""
353 1
        try:
354 1
            return (
355
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
356
                200,
357
            )
358
        except KeyError as error:
359
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
360
361 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
362 1
    def add_metadata(self, circuit_id):
363
        """Add metadata to an EVC."""
364 1
        try:
365 1
            metadata = request.get_json()
366 1
            content_type = request.content_type
367 1
        except BadRequest as error:
368 1
            result = "The request body is not a well-formed JSON."
369 1
            raise BadRequest(result) from error
370 1
        if content_type is None:
371 1
            result = "The request body is empty."
372 1
            raise BadRequest(result)
373 1
        if metadata is None:
374 1
            if content_type != "application/json":
375 1
                result = (
376
                    "The content type must be application/json "
377
                    f"(received {content_type})."
378
                )
379
            else:
380
                result = "Metadata is empty."
381 1
            raise UnsupportedMediaType(result)
382
383 1
        try:
384 1
            evc = self.circuits[circuit_id]
385 1
        except KeyError as error:
386 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
387
388 1
        evc.extend_metadata(metadata)
389 1
        evc.sync()
390 1
        return jsonify("Operation successful"), 201
391
392 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
393 1
    def delete_metadata(self, circuit_id, key):
394
        """Delete metadata from an EVC."""
395 1
        try:
396 1
            evc = self.circuits[circuit_id]
397 1
        except KeyError as error:
398 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
399
400 1
        evc.remove_metadata(key)
401 1
        evc.sync()
402 1
        return jsonify("Operation successful"), 200
403
404 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
405 1
    def redeploy(self, circuit_id):
406
        """Endpoint to force the redeployment of an EVC."""
407 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
408 1
        try:
409 1
            evc = self.circuits[circuit_id]
410 1
        except KeyError:
411 1
            result = f"circuit_id {circuit_id} not found"
412 1
            raise NotFound(result) from NotFound
413 1
        if evc.is_enabled():
414 1
            with evc.lock:
415 1
                evc.remove_current_flows()
416 1
                evc.deploy()
417 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
418 1
            status = 202
419
        else:
420 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
421 1
            status = 409
422
423 1
        return jsonify(result), status
424
425 1
    @rest("/v2/evc/schedule", methods=["GET"])
426 1
    def list_schedules(self):
427
        """Endpoint to return all schedules stored for all circuits.
428
429
        Return a JSON with the following template:
430
        [{"schedule_id": <schedule_id>,
431
         "circuit_id": <circuit_id>,
432
         "schedule": <schedule object>}]
433
        """
434 1
        log.debug("list_schedules /v2/evc/schedule")
435 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
436 1
        if not circuits:
437 1
            result = {}
438 1
            status = 200
439 1
            return jsonify(result), status
440
441 1
        result = []
442 1
        status = 200
443 1
        for circuit in circuits:
444 1
            circuit_scheduler = circuit.get("circuit_scheduler")
445 1
            if circuit_scheduler:
446 1
                for scheduler in circuit_scheduler:
447 1
                    value = {
448
                        "schedule_id": scheduler.get("id"),
449
                        "circuit_id": circuit.get("id"),
450
                        "schedule": scheduler,
451
                    }
452 1
                    result.append(value)
453
454 1
        log.debug("list_schedules result %s %s", result, status)
455 1
        return jsonify(result), status
456
457 1
    @rest("/v2/evc/schedule/", methods=["POST"])
458 1
    def create_schedule(self):
459
        """
460
        Create a new schedule for a given circuit.
461
462
        This service do no check if there are conflicts with another schedule.
463
        Payload example:
464
            {
465
              "circuit_id":"aa:bb:cc",
466
              "schedule": {
467
                "date": "2019-08-07T14:52:10.967Z",
468
                "interval": "string",
469
                "frequency": "1 * * * *",
470
                "action": "create"
471
              }
472
            }
473
        """
474 1
        log.debug("create_schedule /v2/evc/schedule/")
475
476 1
        json_data = self._json_from_request("create_schedule")
477 1
        try:
478 1
            circuit_id = json_data["circuit_id"]
479 1
        except TypeError:
480 1
            result = "The payload should have a dictionary."
481 1
            log.debug("create_schedule result %s %s", result, 400)
482 1
            raise BadRequest(result) from BadRequest
483 1
        except KeyError:
484 1
            result = "Missing circuit_id."
485 1
            log.debug("create_schedule result %s %s", result, 400)
486 1
            raise BadRequest(result) from BadRequest
487
488 1
        try:
489 1
            schedule_data = json_data["schedule"]
490 1
        except KeyError:
491 1
            result = "Missing schedule data."
492 1
            log.debug("create_schedule result %s %s", result, 400)
493 1
            raise BadRequest(result) from BadRequest
494
495
        # Get EVC from circuits buffer
496 1
        circuits = self._get_circuits_buffer()
497
498
        # get the circuit
499 1
        evc = circuits.get(circuit_id)
500
501
        # get the circuit
502 1
        if not evc:
503 1
            result = f"circuit_id {circuit_id} not found"
504 1
            log.debug("create_schedule result %s %s", result, 404)
505 1
            raise NotFound(result) from NotFound
506
        # Can not modify circuits deleted and archived
507 1
        if evc.archived:
508 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
509 1
            log.debug("create_schedule result %s %s", result, 403)
510 1
            raise Forbidden(result) from Forbidden
511
512
        # new schedule from dict
513 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
514
515
        # If there is no schedule, create the list
516 1
        if not evc.circuit_scheduler:
517 1
            evc.circuit_scheduler = []
518
519
        # Add the new schedule
520 1
        evc.circuit_scheduler.append(new_schedule)
521
522
        # Add schedule job
523 1
        self.sched.add_circuit_job(evc, new_schedule)
524
525
        # save circuit to mongodb
526 1
        evc.sync()
527
528 1
        result = new_schedule.as_dict()
529 1
        status = 201
530
531 1
        log.debug("create_schedule result %s %s", result, status)
532 1
        return jsonify(result), status
533
534 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
535 1
    def update_schedule(self, schedule_id):
536
        """Update a schedule.
537
538
        Change all attributes from the given schedule from a EVC circuit.
539
        The schedule ID is preserved as default.
540
        Payload example:
541
            {
542
              "date": "2019-08-07T14:52:10.967Z",
543
              "interval": "string",
544
              "frequency": "1 * * *",
545
              "action": "create"
546
            }
547
        """
548 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
549
550
        # Try to find a circuit schedule
551 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
552
553
        # Can not modify circuits deleted and archived
554 1
        if not found_schedule:
555 1
            result = f"schedule_id {schedule_id} not found"
556 1
            log.debug("update_schedule result %s %s", result, 404)
557 1
            raise NotFound(result) from NotFound
558 1
        if evc.archived:
559 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
560 1
            log.debug("update_schedule result %s %s", result, 403)
561 1
            raise Forbidden(result) from Forbidden
562
563 1
        data = self._json_from_request("update_schedule")
564
565 1
        new_schedule = CircuitSchedule.from_dict(data)
566 1
        new_schedule.id = found_schedule.id
567
        # Remove the old schedule
568 1
        evc.circuit_scheduler.remove(found_schedule)
569
        # Append the modified schedule
570 1
        evc.circuit_scheduler.append(new_schedule)
571
572
        # Cancel all schedule jobs
573 1
        self.sched.cancel_job(found_schedule.id)
574
        # Add the new circuit schedule
575 1
        self.sched.add_circuit_job(evc, new_schedule)
576
        # Save EVC to mongodb
577 1
        evc.sync()
578
579 1
        result = new_schedule.as_dict()
580 1
        status = 200
581
582 1
        log.debug("update_schedule result %s %s", result, status)
583 1
        return jsonify(result), status
584
585 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
586 1
    def delete_schedule(self, schedule_id):
587
        """Remove a circuit schedule.
588
589
        Remove the Schedule from EVC.
590
        Remove the Schedule from cron job.
591
        Save the EVC to the Storehouse.
592
        """
593 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
594 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
595
596
        # Can not modify circuits deleted and archived
597 1
        if not found_schedule:
598 1
            result = f"schedule_id {schedule_id} not found"
599 1
            log.debug("delete_schedule result %s %s", result, 404)
600 1
            raise NotFound(result)
601
602 1
        if evc.archived:
603 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
604 1
            log.debug("delete_schedule result %s %s", result, 403)
605 1
            raise Forbidden(result)
606
607
        # Remove the old schedule
608 1
        evc.circuit_scheduler.remove(found_schedule)
609
610
        # Cancel all schedule jobs
611 1
        self.sched.cancel_job(found_schedule.id)
612
        # Save EVC to mongodb
613 1
        evc.sync()
614
615 1
        result = "Schedule removed"
616 1
        status = 200
617
618 1
        log.debug("delete_schedule result %s %s", result, status)
619 1
        return jsonify(result), status
620
621 1
    def _is_duplicated_evc(self, evc):
622
        """Verify if the circuit given is duplicated with the stored evcs.
623
624
        Args:
625
            evc (EVC): circuit to be analysed.
626
627
        Returns:
628
            boolean: True if the circuit is duplicated, otherwise False.
629
630
        """
631 1
        for circuit in tuple(self.circuits.values()):
632 1
            if not circuit.archived and circuit.shares_uni(evc):
633 1
                return True
634 1
        return False
635
636 1
    @listen_to("kytos/topology.link_up")
637 1
    def on_link_up(self, event):
638
        """Change circuit when link is up or end_maintenance."""
639
        self.handle_link_up(event)
640
641 1
    def handle_link_up(self, event):
642
        """Change circuit when link is up or end_maintenance."""
643 1
        log.debug("Event handle_link_up %s", event)
644 1
        for evc in self.get_evcs_by_svc_level():
645 1
            if evc.is_enabled() and not evc.archived:
646 1
                with evc.lock:
647 1
                    evc.handle_link_up(event.content["link"])
648
649 1
    @listen_to("kytos/topology.link_down")
650 1
    def on_link_down(self, event):
651
        """Change circuit when link is down or under_mantenance."""
652
        self.handle_link_down(event)
653
654 1
    def handle_link_down(self, event):
655
        """Change circuit when link is down or under_mantenance."""
656 1
        link = event.content["link"]
657 1
        log.info("Event handle_link_down %s", link)
658 1
        switch_flows = {}
659 1
        evcs_with_failover = []
660 1
        evcs_normal = []
661 1
        check_failover = []
662 1
        for evc in self.get_evcs_by_svc_level():
663 1
            if evc.is_affected_by_link(link):
664
                # if there is no failover path, handles link down the
665
                # tradditional way
666 1
                if (
667
                    not getattr(evc, 'failover_path', None) or
668
                    evc.is_failover_path_affected_by_link(link)
669
                ):
670 1
                    evcs_normal.append(evc)
671 1
                    continue
672 1
                for dpid, flows in evc.get_failover_flows().items():
673 1
                    switch_flows.setdefault(dpid, [])
674 1
                    switch_flows[dpid].extend(flows)
675 1
                evcs_with_failover.append(evc)
676
            else:
677 1
                check_failover.append(evc)
678
679 1
        offset = 0
680 1
        while switch_flows:
681 1
            offset = (offset + settings.BATCH_SIZE) or None
682 1
            switches = list(switch_flows.keys())
683 1
            for dpid in switches:
684 1
                emit_event(
685
                    self.controller,
686
                    context="kytos.flow_manager",
687
                    name="flows.install",
688
                    dpid=dpid,
689
                    flow_dict={"flows": switch_flows[dpid][:offset]},
690
                )
691 1
                if offset is None or offset >= len(switch_flows[dpid]):
692 1
                    del switch_flows[dpid]
693 1
                    continue
694 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
695 1
            time.sleep(settings.BATCH_INTERVAL)
696
697 1
        for evc in evcs_with_failover:
698 1
            with evc.lock:
699 1
                old_path = evc.current_path
700 1
                evc.current_path = evc.failover_path
701 1
                evc.failover_path = old_path
702 1
                evc.sync()
703 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
704 1
            log.info(
705
                f"{evc} redeployed with failover due to link down {link.id}"
706
            )
707
708 1
        for evc in evcs_normal:
709 1
            emit_event(
710
                self.controller,
711
                "evc_affected_by_link_down",
712
                evc_id=evc.id,
713
                link_id=link.id,
714
            )
715
716
        # After handling the hot path, check if new failover paths are needed.
717
        # Note that EVCs affected by link down will generate a KytosEvent for
718
        # deployed|redeployed, which will trigger the failover path setup.
719
        # Thus, we just need to further check the check_failover list
720 1
        for evc in check_failover:
721 1
            if evc.is_failover_path_affected_by_link(link):
722 1
                evc.setup_failover_path()
723
724 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
725 1
    def on_evc_affected_by_link_down(self, event):
726
        """Change circuit when link is down or under_mantenance."""
727
        self.handle_evc_affected_by_link_down(event)
728
729 1
    def handle_evc_affected_by_link_down(self, event):
730
        """Change circuit when link is down or under_mantenance."""
731 1
        evc = self.circuits.get(event.content["evc_id"])
732 1
        link_id = event.content['link_id']
733 1
        if not evc:
734 1
            return
735 1
        with evc.lock:
736 1
            result = evc.handle_link_down()
737 1
        event_name = "error_redeploy_link_down"
738 1
        if result:
739 1
            log.info(f"{evc} redeployed due to link down {link_id}")
740 1
            event_name = "redeployed_link_down"
741 1
        emit_event(self.controller, event_name, evc_id=evc.id)
742
743 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
744 1
    def on_evc_deployed(self, event):
745
        """Handle EVC deployed|redeployed_link_down."""
746
        self.handle_evc_deployed(event)
747
748 1
    def handle_evc_deployed(self, event):
749
        """Setup failover path on evc deployed."""
750 1
        evc = self.circuits.get(event.content["evc_id"])
751 1
        if not evc:
752 1
            return
753 1
        with evc.lock:
754 1
            evc.setup_failover_path()
755
756 1
    @listen_to("kytos/topology.topology_loaded")
757 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
758
        """Load EVCs once the topology is available."""
759
        self.load_all_evcs()
760
761 1
    def load_all_evcs(self):
762
        """Try to load all EVCs on startup."""
763 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
764 1
        for circuit_id, circuit in circuits:
765 1
            if circuit_id not in self.circuits:
766 1
                self._load_evc(circuit)
767
768 1
    def _load_evc(self, circuit_dict):
769
        """Load one EVC from mongodb to memory."""
770 1
        try:
771 1
            evc = self._evc_from_dict(circuit_dict)
772 1
        except ValueError as exception:
773 1
            log.error(
774
                f"Could not load EVC: dict={circuit_dict} error={exception}"
775
            )
776 1
            return None
777
778 1
        if evc.archived:
779 1
            return None
780 1
        evc.deactivate()
781 1
        evc.sync()
782 1
        self.circuits.setdefault(evc.id, evc)
783 1
        self.sched.add(evc)
784 1
        return evc
785
786 1
    @listen_to("kytos/flow_manager.flow.error")
787 1
    def on_flow_mod_error(self, event):
788
        """Handle flow mod errors related to an EVC."""
789
        self.handle_flow_mod_error(event)
790
791 1
    def handle_flow_mod_error(self, event):
792
        """Handle flow mod errors related to an EVC."""
793 1
        flow = event.content["flow"]
794 1
        command = event.content.get("error_command")
795 1
        if command != "add":
796
            return
797 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
798 1
        if evc:
799 1
            evc.remove_current_flows()
800
801 1
    def _evc_dict_with_instances(self, evc_dict):
802
        """Convert some dict values to instance of EVC classes.
803
804
        This method will convert: [UNI, Link]
805
        """
806 1
        data = evc_dict.copy()  # Do not modify the original dict
807 1
        for attribute, value in data.items():
808
            # Get multiple attributes.
809
            # Ex: uni_a, uni_z
810 1
            if "uni" in attribute:
811 1
                try:
812 1
                    data[attribute] = self._uni_from_dict(value)
813 1
                except ValueError as exception:
814 1
                    result = "Error creating UNI: Invalid value"
815 1
                    raise ValueError(result) from exception
816
817 1
            if attribute == "circuit_scheduler":
818 1
                data[attribute] = []
819 1
                for schedule in value:
820 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
821
822
            # Get multiple attributes.
823
            # Ex: primary_links,
824
            #     backup_links,
825
            #     current_links_cache,
826
            #     primary_links_cache,
827
            #     backup_links_cache
828 1
            if "links" in attribute:
829 1
                data[attribute] = [
830
                    self._link_from_dict(link) for link in value
831
                ]
832
833
            # Ex: current_path,
834
            #     primary_path,
835
            #     backup_path
836 1
            if "path" in attribute and attribute != "dynamic_backup_path":
837 1
                data[attribute] = Path(
838
                    [self._link_from_dict(link) for link in value]
839
                )
840
841 1
        return data
842
843 1
    def _evc_from_dict(self, evc_dict):
844 1
        data = self._evc_dict_with_instances(evc_dict)
845 1
        return EVC(self.controller, **data)
846
847 1
    def _uni_from_dict(self, uni_dict):
848
        """Return a UNI object from python dict."""
849 1
        if uni_dict is None:
850 1
            return False
851
852 1
        interface_id = uni_dict.get("interface_id")
853 1
        interface = self.controller.get_interface_by_id(interface_id)
854 1
        if interface is None:
855 1
            result = (
856
                "Error creating UNI:"
857
                + f"Could not instantiate interface {interface_id}"
858
            )
859 1
            raise ValueError(result) from ValueError
860
861 1
        tag_dict = uni_dict.get("tag", None)
862 1
        if tag_dict:
863 1
            tag = TAG.from_dict(tag_dict)
864
        else:
865 1
            tag = None
866 1
        uni = UNI(interface, tag)
867
868 1
        return uni
869
870 1
    def _link_from_dict(self, link_dict):
871
        """Return a Link object from python dict."""
872 1
        id_a = link_dict.get("endpoint_a").get("id")
873 1
        id_b = link_dict.get("endpoint_b").get("id")
874
875 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
876 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
877
878 1
        link = Link(endpoint_a, endpoint_b)
879 1
        if "metadata" in link_dict:
880
            link.extend_metadata(link_dict.get("metadata"))
881
882 1
        s_vlan = link.get_metadata("s_vlan")
883 1
        if s_vlan:
884
            tag = TAG.from_dict(s_vlan)
885
            if tag is False:
886
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
887
                raise ValueError(error_msg)
888
            link.update_metadata("s_vlan", tag)
889 1
        return link
890
891 1
    def _find_evc_by_schedule_id(self, schedule_id):
892
        """
893
        Find an EVC and CircuitSchedule based on schedule_id.
894
895
        :param schedule_id: Schedule ID
896
        :return: EVC and Schedule
897
        """
898 1
        circuits = self._get_circuits_buffer()
899 1
        found_schedule = None
900 1
        evc = None
901
902
        # pylint: disable=unused-variable
903 1
        for c_id, circuit in circuits.items():
904 1
            for schedule in circuit.circuit_scheduler:
905 1
                if schedule.id == schedule_id:
906 1
                    found_schedule = schedule
907 1
                    evc = circuit
908 1
                    break
909 1
            if found_schedule:
910 1
                break
911 1
        return evc, found_schedule
912
913 1
    def _get_circuits_buffer(self):
914
        """
915
        Return the circuit buffer.
916
917
        If the buffer is empty, try to load data from mongodb.
918
        """
919 1
        if not self.circuits:
920
            # Load circuits from mongodb to buffer
921 1
            circuits = self.mongo_controller.get_circuits()['circuits']
922 1
            for c_id, circuit in circuits.items():
923 1
                evc = self._evc_from_dict(circuit)
924 1
                self.circuits[c_id] = evc
925 1
        return self.circuits
926
927 1
    @staticmethod
928 1
    def _json_from_request(caller):
929
        """Return a json from request.
930
931
        If it was not possible to get a json from the request, log, for debug,
932
        who was the caller and the error that ocurred, and raise an
933
        Exception.
934
        """
935 1
        try:
936 1
            json_data = request.get_json()
937
        except ValueError as exception:
938
            log.error(exception)
939
            log.debug(f"{caller} result {exception} 400")
940
            raise BadRequest(str(exception)) from BadRequest
941
        except BadRequest:
942
            result = "The request is not a valid JSON."
943
            log.debug(f"{caller} result {result} 400")
944
            raise BadRequest(result) from BadRequest
945 1
        if json_data is None:
946 1
            result = "Content-Type must be application/json"
947 1
            log.debug(f"{caller} result {result} 415")
948 1
            raise UnsupportedMediaType(result)
949
        return json_data
950