Passed
Pull Request — master (#258)
by
unknown
05:24 queued 01:48
created

build.main.Main.create_circuit()   C

Complexity

Conditions 11

Size

Total Lines 93
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 12.7555

Importance

Changes 0
Metric Value
cc 11
eloc 52
nop 2
dl 0
loc 93
ccs 31
cts 41
cp 0.7561
crap 12.7555
rs 5.3509
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.create_circuit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    def execute_consistency(self):
87
        """Execute consistency routine."""
88 1
        circuits_to_check = {}
89 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
90 1
        for circuit in self.get_evcs_by_svc_level():
91 1
            stored_circuits.pop(circuit.id, None)
92 1
            if (
93
                circuit.is_enabled()
94
                and not circuit.is_active()
95
                and not circuit.lock.locked()
96
                and not circuit.has_recent_removed_flow()
97
                and not circuit.is_recent_updated()
98
            ):
99 1
                circuits_to_check[circuit.id] = circuit
100 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
101 1
        for circuit_id, circuit in circuits_to_check.items():
102 1
            is_checked = circuits_checked.get(circuit_id)
103 1
            if is_checked:
104 1
                circuit.execution_rounds = 0
105 1
                log.info(f"{circuit} enabled but inactive - activating")
106 1
                with circuit.lock:
107 1
                    circuit.activate()
108 1
                    circuit.sync()
109
            else:
110 1
                circuit.execution_rounds += 1
111 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
112 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
113 1
                    with circuit.lock:
114 1
                        circuit.deploy()
115 1
        for circuit_id in stored_circuits:
116 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
117 1
            self._load_evc(stored_circuits[circuit_id])
118
119 1
    def shutdown(self):
120
        """Execute when your napp is unloaded.
121
122
        If you have some cleanup procedure, insert it here.
123
        """
124
125 1
    @rest("/v2/evc/", methods=["GET"])
126 1
    def list_circuits(self):
127
        """Endpoint to return circuits stored.
128
129
        archive query arg if defined (not null) will be filtered
130
        accordingly, by default only non archived evcs will be listed
131
        """
132 1
        log.debug("list_circuits /v2/evc")
133 1
        archived = request.args.get("archived", "false").lower()
134 1
        archived_to_optional = {"null": None, "true": True, "false": False}
135 1
        archived = archived_to_optional.get(archived, False)
136 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
137 1
        circuits = circuits['circuits']
138 1
        return jsonify(circuits), 200
139
140 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
141 1
    def get_circuit(self, circuit_id):
142
        """Endpoint to return a circuit based on id."""
143 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
144 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
145 1
        if not circuit:
146 1
            result = f"circuit_id {circuit_id} not found"
147 1
            log.debug("get_circuit result %s %s", result, 404)
148 1
            raise NotFound(result)
149 1
        status = 200
150 1
        log.debug("get_circuit result %s %s", circuit, status)
151 1
        return jsonify(circuit), status
152
153 1
    @rest("/v2/evc/", methods=["POST"])
154 1
    @validate(spec)
155 1
    def create_circuit(self, data):
156
        """Try to create a new circuit.
157
158
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
159
        UNI_Z's requested C-VID are available from the interfaces' pools. This
160
        is checked when creating the UNI object.
161
162
        Then, E-Line NApp requests a primary and a backup path to the
163
        Pathfinder NApp using the attributes primary_links and backup_links
164
        submitted via REST
165
166
        # For each link composing paths in #3:
167
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
168
        #  - Using the S-VID obtained, generate abstract flow entries to be
169
        #    sent to FlowManager
170
171
        Push abstract flow entries to FlowManager and FlowManager pushes
172
        OpenFlow entries to datapaths
173
174
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
175
        creation
176
177
        Finnaly, notify user of the status of its request.
178
        """
179
        # Try to create the circuit object
180 1
        log.debug("create_circuit /v2/evc/")
181
182 1
        try:
183 1
            evc = self._evc_from_dict(data)
184 1
        except ValueError as exception:
185 1
            log.debug("create_circuit result %s %s", exception, 400)
186 1
            raise BadRequest(str(exception)) from BadRequest
187
188 1
        if evc.primary_path:
189
            try:
190
                evc.primary_path.is_valid(
191
                    evc.uni_a.interface.switch,
192
                    evc.uni_z.interface.switch,
193
                    bool(evc.circuit_scheduler),
194
                )
195
            except InvalidPath as exception:
196
                raise BadRequest(
197
                    f"primary_path is not valid: {exception}"
198
                ) from exception
199 1
        if evc.backup_path:
200
            try:
201
                evc.backup_path.is_valid(
202
                    evc.uni_a.interface.switch,
203
                    evc.uni_z.interface.switch,
204
                    bool(evc.circuit_scheduler),
205
                )
206
            except InvalidPath as exception:
207
                raise BadRequest(
208
                    f"backup_path is not valid: {exception}"
209
                ) from exception
210
211
        # verify duplicated evc
212 1
        if self._is_duplicated_evc(evc):
213 1
            result = "The EVC already exists."
214 1
            log.debug("create_circuit result %s %s", result, 409)
215 1
            raise Conflict(result)
216
217 1
        try:
218 1
            evc._validate_has_primary_or_dynamic()
219 1
        except ValueError as exception:
220 1
            raise BadRequest(str(exception)) from exception
221
222
        # save circuit
223 1
        try:
224 1
            evc.sync()
225
        except ValidationError as exception:
226
            raise BadRequest(str(exception)) from exception
227
228
        # store circuit in dictionary
229 1
        self.circuits[evc.id] = evc
230
231
        # Schedule the circuit deploy
232 1
        self.sched.add(evc)
233
234
        # Circuit has no schedule, deploy now
235 1
        if not evc.circuit_scheduler:
236 1
            with evc.lock:
237 1
                evc.deploy()
238
239
        # Notify users
240 1
        result = {"circuit_id": evc.id}
241 1
        status = 201
242 1
        log.debug("create_circuit result %s %s", result, status)
243 1
        emit_event(self.controller, name="created",
244
                   content=map_evc_event_content(evc))
245 1
        return jsonify(result), status
246
247 1
    @listen_to('kytos/flow_manager.flow.removed')
248 1
    def on_flow_delete(self, event):
249
        """Capture delete messages to keep track when flows got removed."""
250
        self.handle_flow_delete(event)
251
252 1
    def handle_flow_delete(self, event):
253
        """Keep track when the EVC got flows removed by deriving its cookie."""
254 1
        flow = event.content["flow"]
255 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
256 1
        if evc:
257 1
            log.debug("Flow removed in EVC %s", evc.id)
258 1
            evc.set_flow_removed_at()
259
260 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
261 1
    def update(self, circuit_id):
262
        """Update a circuit based on payload.
263
264
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
265
        """
266 1
        log.debug("update /v2/evc/%s", circuit_id)
267 1
        try:
268 1
            evc = self.circuits[circuit_id]
269 1
        except KeyError:
270 1
            result = f"circuit_id {circuit_id} not found"
271 1
            log.debug("update result %s %s", result, 404)
272 1
            raise NotFound(result) from NotFound
273
274 1
        if evc.archived:
275 1
            result = "Can't update archived EVC"
276 1
            log.debug("update result %s %s", result, 405)
277 1
            raise MethodNotAllowed(["GET"], result)
278
279 1
        try:
280 1
            data = request.get_json()
281 1
        except BadRequest:
282 1
            result = "The request body is not a well-formed JSON."
283 1
            log.debug("update result %s %s", result, 400)
284 1
            raise BadRequest(result) from BadRequest
285 1
        if data is None:
286 1
            result = "The request body mimetype is not application/json."
287 1
            log.debug("update result %s %s", result, 415)
288 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
289
290 1
        try:
291 1
            enable, redeploy = evc.update(
292
                **self._evc_dict_with_instances(data)
293
            )
294 1
        except ValueError as exception:
295 1
            log.error(exception)
296 1
            log.debug("update result %s %s", exception, 400)
297 1
            raise BadRequest(str(exception)) from BadRequest
298
299 1
        if evc.is_active():
300
            if enable is False:  # disable if active
301
                with evc.lock:
302
                    evc.remove()
303
            elif redeploy is not None:  # redeploy if active
304
                with evc.lock:
305
                    evc.remove()
306
                    evc.deploy()
307
        else:
308 1
            if enable is True:  # enable if inactive
309 1
                with evc.lock:
310 1
                    evc.deploy()
311 1
        result = {evc.id: evc.as_dict()}
312 1
        status = 200
313
314 1
        log.debug("update result %s %s", result, status)
315 1
        emit_event(self.controller, "updated",
316
                   content=map_evc_event_content(evc, **data))
317 1
        return jsonify(result), status
318
319 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
320 1
    def delete_circuit(self, circuit_id):
321
        """Remove a circuit.
322
323
        First, the flows are removed from the switches, and then the EVC is
324
        disabled.
325
        """
326 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
327 1
        try:
328 1
            evc = self.circuits[circuit_id]
329 1
        except KeyError:
330 1
            result = f"circuit_id {circuit_id} not found"
331 1
            log.debug("delete_circuit result %s %s", result, 404)
332 1
            raise NotFound(result) from NotFound
333
334 1
        if evc.archived:
335 1
            result = f"Circuit {circuit_id} already removed"
336 1
            log.debug("delete_circuit result %s %s", result, 404)
337 1
            raise NotFound(result) from NotFound
338
339 1
        log.info("Removing %s", evc)
340 1
        with evc.lock:
341 1
            evc.remove_current_flows()
342 1
            evc.remove_failover_flows(sync=False)
343 1
            evc.deactivate()
344 1
            evc.disable()
345 1
            self.sched.remove(evc)
346 1
            evc.archive()
347 1
            evc.sync()
348 1
        log.info("EVC removed. %s", evc)
349 1
        result = {"response": f"Circuit {circuit_id} removed"}
350 1
        status = 200
351
352 1
        log.debug("delete_circuit result %s %s", result, status)
353 1
        emit_event(self.controller, "deleted",
354
                   content=map_evc_event_content(evc))
355 1
        return jsonify(result), status
356
357 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
358 1
    def get_metadata(self, circuit_id):
359
        """Get metadata from an EVC."""
360 1
        try:
361 1
            return (
362
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
363
                200,
364
            )
365
        except KeyError as error:
366
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
367
368 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
369 1
    def add_metadata(self, circuit_id):
370
        """Add metadata to an EVC."""
371 1
        try:
372 1
            metadata = request.get_json()
373 1
            content_type = request.content_type
374 1
        except BadRequest as error:
375 1
            result = "The request body is not a well-formed JSON."
376 1
            raise BadRequest(result) from error
377 1
        if content_type is None:
378 1
            result = "The request body is empty."
379 1
            raise BadRequest(result)
380 1
        if metadata is None:
381 1
            if content_type != "application/json":
382 1
                result = (
383
                    "The content type must be application/json "
384
                    f"(received {content_type})."
385
                )
386
            else:
387
                result = "Metadata is empty."
388 1
            raise UnsupportedMediaType(result)
389
390 1
        try:
391 1
            evc = self.circuits[circuit_id]
392 1
        except KeyError as error:
393 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
394
395 1
        evc.extend_metadata(metadata)
396 1
        evc.sync()
397 1
        return jsonify("Operation successful"), 201
398
399 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
400 1
    def delete_metadata(self, circuit_id, key):
401
        """Delete metadata from an EVC."""
402 1
        try:
403 1
            evc = self.circuits[circuit_id]
404 1
        except KeyError as error:
405 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
406
407 1
        evc.remove_metadata(key)
408 1
        evc.sync()
409 1
        return jsonify("Operation successful"), 200
410
411 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
412 1
    def redeploy(self, circuit_id):
413
        """Endpoint to force the redeployment of an EVC."""
414 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
415 1
        try:
416 1
            evc = self.circuits[circuit_id]
417 1
        except KeyError:
418 1
            result = f"circuit_id {circuit_id} not found"
419 1
            raise NotFound(result) from NotFound
420 1
        if evc.is_enabled():
421 1
            with evc.lock:
422 1
                evc.remove_current_flows()
423 1
                evc.deploy()
424 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
425 1
            status = 202
426
        else:
427 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
428 1
            status = 409
429
430 1
        return jsonify(result), status
431
432 1
    @rest("/v2/evc/schedule", methods=["GET"])
433 1
    def list_schedules(self):
434
        """Endpoint to return all schedules stored for all circuits.
435
436
        Return a JSON with the following template:
437
        [{"schedule_id": <schedule_id>,
438
         "circuit_id": <circuit_id>,
439
         "schedule": <schedule object>}]
440
        """
441 1
        log.debug("list_schedules /v2/evc/schedule")
442 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
443 1
        if not circuits:
444 1
            result = {}
445 1
            status = 200
446 1
            return jsonify(result), status
447
448 1
        result = []
449 1
        status = 200
450 1
        for circuit in circuits:
451 1
            circuit_scheduler = circuit.get("circuit_scheduler")
452 1
            if circuit_scheduler:
453 1
                for scheduler in circuit_scheduler:
454 1
                    value = {
455
                        "schedule_id": scheduler.get("id"),
456
                        "circuit_id": circuit.get("id"),
457
                        "schedule": scheduler,
458
                    }
459 1
                    result.append(value)
460
461 1
        log.debug("list_schedules result %s %s", result, status)
462 1
        return jsonify(result), status
463
464 1
    @rest("/v2/evc/schedule/", methods=["POST"])
465 1
    def create_schedule(self):
466
        """
467
        Create a new schedule for a given circuit.
468
469
        This service do no check if there are conflicts with another schedule.
470
        Payload example:
471
            {
472
              "circuit_id":"aa:bb:cc",
473
              "schedule": {
474
                "date": "2019-08-07T14:52:10.967Z",
475
                "interval": "string",
476
                "frequency": "1 * * * *",
477
                "action": "create"
478
              }
479
            }
480
        """
481 1
        log.debug("create_schedule /v2/evc/schedule/")
482
483 1
        json_data = self._json_from_request("create_schedule")
484 1
        try:
485 1
            circuit_id = json_data["circuit_id"]
486 1
        except TypeError:
487 1
            result = "The payload should have a dictionary."
488 1
            log.debug("create_schedule result %s %s", result, 400)
489 1
            raise BadRequest(result) from BadRequest
490 1
        except KeyError:
491 1
            result = "Missing circuit_id."
492 1
            log.debug("create_schedule result %s %s", result, 400)
493 1
            raise BadRequest(result) from BadRequest
494
495 1
        try:
496 1
            schedule_data = json_data["schedule"]
497 1
        except KeyError:
498 1
            result = "Missing schedule data."
499 1
            log.debug("create_schedule result %s %s", result, 400)
500 1
            raise BadRequest(result) from BadRequest
501
502
        # Get EVC from circuits buffer
503 1
        circuits = self._get_circuits_buffer()
504
505
        # get the circuit
506 1
        evc = circuits.get(circuit_id)
507
508
        # get the circuit
509 1
        if not evc:
510 1
            result = f"circuit_id {circuit_id} not found"
511 1
            log.debug("create_schedule result %s %s", result, 404)
512 1
            raise NotFound(result) from NotFound
513
        # Can not modify circuits deleted and archived
514 1
        if evc.archived:
515 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
516 1
            log.debug("create_schedule result %s %s", result, 403)
517 1
            raise Forbidden(result) from Forbidden
518
519
        # new schedule from dict
520 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
521
522
        # If there is no schedule, create the list
523 1
        if not evc.circuit_scheduler:
524 1
            evc.circuit_scheduler = []
525
526
        # Add the new schedule
527 1
        evc.circuit_scheduler.append(new_schedule)
528
529
        # Add schedule job
530 1
        self.sched.add_circuit_job(evc, new_schedule)
531
532
        # save circuit to mongodb
533 1
        evc.sync()
534
535 1
        result = new_schedule.as_dict()
536 1
        status = 201
537
538 1
        log.debug("create_schedule result %s %s", result, status)
539 1
        return jsonify(result), status
540
541 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
542 1
    def update_schedule(self, schedule_id):
543
        """Update a schedule.
544
545
        Change all attributes from the given schedule from a EVC circuit.
546
        The schedule ID is preserved as default.
547
        Payload example:
548
            {
549
              "date": "2019-08-07T14:52:10.967Z",
550
              "interval": "string",
551
              "frequency": "1 * * *",
552
              "action": "create"
553
            }
554
        """
555 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
556
557
        # Try to find a circuit schedule
558 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
559
560
        # Can not modify circuits deleted and archived
561 1
        if not found_schedule:
562 1
            result = f"schedule_id {schedule_id} not found"
563 1
            log.debug("update_schedule result %s %s", result, 404)
564 1
            raise NotFound(result) from NotFound
565 1
        if evc.archived:
566 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
567 1
            log.debug("update_schedule result %s %s", result, 403)
568 1
            raise Forbidden(result) from Forbidden
569
570 1
        data = self._json_from_request("update_schedule")
571
572 1
        new_schedule = CircuitSchedule.from_dict(data)
573 1
        new_schedule.id = found_schedule.id
574
        # Remove the old schedule
575 1
        evc.circuit_scheduler.remove(found_schedule)
576
        # Append the modified schedule
577 1
        evc.circuit_scheduler.append(new_schedule)
578
579
        # Cancel all schedule jobs
580 1
        self.sched.cancel_job(found_schedule.id)
581
        # Add the new circuit schedule
582 1
        self.sched.add_circuit_job(evc, new_schedule)
583
        # Save EVC to mongodb
584 1
        evc.sync()
585
586 1
        result = new_schedule.as_dict()
587 1
        status = 200
588
589 1
        log.debug("update_schedule result %s %s", result, status)
590 1
        return jsonify(result), status
591
592 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
593 1
    def delete_schedule(self, schedule_id):
594
        """Remove a circuit schedule.
595
596
        Remove the Schedule from EVC.
597
        Remove the Schedule from cron job.
598
        Save the EVC to the Storehouse.
599
        """
600 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
601 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
602
603
        # Can not modify circuits deleted and archived
604 1
        if not found_schedule:
605 1
            result = f"schedule_id {schedule_id} not found"
606 1
            log.debug("delete_schedule result %s %s", result, 404)
607 1
            raise NotFound(result)
608
609 1
        if evc.archived:
610 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
611 1
            log.debug("delete_schedule result %s %s", result, 403)
612 1
            raise Forbidden(result)
613
614
        # Remove the old schedule
615 1
        evc.circuit_scheduler.remove(found_schedule)
616
617
        # Cancel all schedule jobs
618 1
        self.sched.cancel_job(found_schedule.id)
619
        # Save EVC to mongodb
620 1
        evc.sync()
621
622 1
        result = "Schedule removed"
623 1
        status = 200
624
625 1
        log.debug("delete_schedule result %s %s", result, status)
626 1
        return jsonify(result), status
627
628 1
    def _is_duplicated_evc(self, evc):
629
        """Verify if the circuit given is duplicated with the stored evcs.
630
631
        Args:
632
            evc (EVC): circuit to be analysed.
633
634
        Returns:
635
            boolean: True if the circuit is duplicated, otherwise False.
636
637
        """
638 1
        for circuit in tuple(self.circuits.values()):
639 1
            if not circuit.archived and circuit.shares_uni(evc):
640 1
                return True
641 1
        return False
642
643 1
    @listen_to("kytos/topology.link_up")
644 1
    def on_link_up(self, event):
645
        """Change circuit when link is up or end_maintenance."""
646
        self.handle_link_up(event)
647
648 1
    def handle_link_up(self, event):
649
        """Change circuit when link is up or end_maintenance."""
650 1
        log.info("Event handle_link_up %s", event.content["link"])
651 1
        for evc in self.get_evcs_by_svc_level():
652 1
            if evc.is_enabled() and not evc.archived:
653 1
                with evc.lock:
654 1
                    evc.handle_link_up(event.content["link"])
655
656 1
    @listen_to("kytos/topology.link_down")
657 1
    def on_link_down(self, event):
658
        """Change circuit when link is down or under_mantenance."""
659
        self.handle_link_down(event)
660
661 1
    def handle_link_down(self, event):
662
        """Change circuit when link is down or under_mantenance."""
663 1
        link = event.content["link"]
664 1
        log.info("Event handle_link_down %s", link)
665 1
        switch_flows = {}
666 1
        evcs_with_failover = []
667 1
        evcs_normal = []
668 1
        check_failover = []
669 1
        for evc in self.get_evcs_by_svc_level():
670 1
            if evc.is_affected_by_link(link):
671
                # if there is no failover path, handles link down the
672
                # tradditional way
673 1
                if (
674
                    not getattr(evc, 'failover_path', None) or
675
                    evc.is_failover_path_affected_by_link(link)
676
                ):
677 1
                    evcs_normal.append(evc)
678 1
                    continue
679 1
                for dpid, flows in evc.get_failover_flows().items():
680 1
                    switch_flows.setdefault(dpid, [])
681 1
                    switch_flows[dpid].extend(flows)
682 1
                evcs_with_failover.append(evc)
683
            else:
684 1
                check_failover.append(evc)
685
686 1
        offset = 0
687 1
        while switch_flows:
688 1
            offset = (offset + settings.BATCH_SIZE) or None
689 1
            switches = list(switch_flows.keys())
690 1
            for dpid in switches:
691 1
                emit_event(
692
                    self.controller,
693
                    context="kytos.flow_manager",
694
                    name="flows.install",
695
                    content={
696
                        "dpid": dpid,
697
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
698
                    }
699
                )
700 1
                if offset is None or offset >= len(switch_flows[dpid]):
701 1
                    del switch_flows[dpid]
702 1
                    continue
703 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
704 1
            time.sleep(settings.BATCH_INTERVAL)
705
706 1
        for evc in evcs_with_failover:
707 1
            with evc.lock:
708 1
                old_path = evc.current_path
709 1
                evc.current_path = evc.failover_path
710 1
                evc.failover_path = old_path
711 1
                evc.sync()
712 1
            emit_event(self.controller, "redeployed_link_down",
713
                       content=map_evc_event_content(evc))
714 1
            log.info(
715
                f"{evc} redeployed with failover due to link down {link.id}"
716
            )
717
718 1
        for evc in evcs_normal:
719 1
            emit_event(
720
                self.controller,
721
                "evc_affected_by_link_down",
722
                content={"link_id": link.id} | map_evc_event_content(evc)
723
            )
724
725
        # After handling the hot path, check if new failover paths are needed.
726
        # Note that EVCs affected by link down will generate a KytosEvent for
727
        # deployed|redeployed, which will trigger the failover path setup.
728
        # Thus, we just need to further check the check_failover list
729 1
        for evc in check_failover:
730 1
            if evc.is_failover_path_affected_by_link(link):
731 1
                evc.setup_failover_path()
732
733 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
734 1
    def on_evc_affected_by_link_down(self, event):
735
        """Change circuit when link is down or under_mantenance."""
736
        self.handle_evc_affected_by_link_down(event)
737
738 1
    def handle_evc_affected_by_link_down(self, event):
739
        """Change circuit when link is down or under_mantenance."""
740 1
        evc = self.circuits.get(event.content["evc_id"])
741 1
        link_id = event.content['link_id']
742 1
        if not evc:
743 1
            return
744 1
        with evc.lock:
745 1
            result = evc.handle_link_down()
746 1
        event_name = "error_redeploy_link_down"
747 1
        if result:
748 1
            log.info(f"{evc} redeployed due to link down {link_id}")
749 1
            event_name = "redeployed_link_down"
750 1
        emit_event(self.controller, event_name,
751
                   content=map_evc_event_content(evc))
752
753 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
754 1
    def on_evc_deployed(self, event):
755
        """Handle EVC deployed|redeployed_link_down."""
756
        self.handle_evc_deployed(event)
757
758 1
    def handle_evc_deployed(self, event):
759
        """Setup failover path on evc deployed."""
760 1
        evc = self.circuits.get(event.content["evc_id"])
761 1
        if not evc:
762 1
            return
763 1
        with evc.lock:
764 1
            evc.setup_failover_path()
765
766 1
    @listen_to("kytos/topology.topology_loaded")
767 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
768
        """Load EVCs once the topology is available."""
769
        self.load_all_evcs()
770
771 1
    def load_all_evcs(self):
772
        """Try to load all EVCs on startup."""
773 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
774 1
        for circuit_id, circuit in circuits:
775 1
            if circuit_id not in self.circuits:
776 1
                self._load_evc(circuit)
777
778 1
    def _load_evc(self, circuit_dict):
779
        """Load one EVC from mongodb to memory."""
780 1
        try:
781 1
            evc = self._evc_from_dict(circuit_dict)
782 1
        except ValueError as exception:
783 1
            log.error(
784
                f"Could not load EVC: dict={circuit_dict} error={exception}"
785
            )
786 1
            return None
787
788 1
        if evc.archived:
789 1
            return None
790 1
        evc.deactivate()
791 1
        evc.sync()
792 1
        self.circuits.setdefault(evc.id, evc)
793 1
        self.sched.add(evc)
794 1
        return evc
795
796 1
    @listen_to("kytos/flow_manager.flow.error")
797 1
    def on_flow_mod_error(self, event):
798
        """Handle flow mod errors related to an EVC."""
799
        self.handle_flow_mod_error(event)
800
801 1
    def handle_flow_mod_error(self, event):
802
        """Handle flow mod errors related to an EVC."""
803 1
        flow = event.content["flow"]
804 1
        command = event.content.get("error_command")
805 1
        if command != "add":
806
            return
807 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
808 1
        if evc:
809 1
            evc.remove_current_flows()
810
811 1
    def _evc_dict_with_instances(self, evc_dict):
812
        """Convert some dict values to instance of EVC classes.
813
814
        This method will convert: [UNI, Link]
815
        """
816 1
        data = evc_dict.copy()  # Do not modify the original dict
817 1
        for attribute, value in data.items():
818
            # Get multiple attributes.
819
            # Ex: uni_a, uni_z
820 1
            if "uni" in attribute:
821 1
                try:
822 1
                    data[attribute] = self._uni_from_dict(value)
823 1
                except ValueError as exception:
824 1
                    result = "Error creating UNI: Invalid value"
825 1
                    raise ValueError(result) from exception
826
827 1
            if attribute == "circuit_scheduler":
828 1
                data[attribute] = []
829 1
                for schedule in value:
830 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
831
832
            # Get multiple attributes.
833
            # Ex: primary_links,
834
            #     backup_links,
835
            #     current_links_cache,
836
            #     primary_links_cache,
837
            #     backup_links_cache
838 1
            if "links" in attribute:
839 1
                data[attribute] = [
840
                    self._link_from_dict(link) for link in value
841
                ]
842
843
            # Ex: current_path,
844
            #     primary_path,
845
            #     backup_path
846 1
            if "path" in attribute and attribute != "dynamic_backup_path":
847 1
                data[attribute] = Path(
848
                    [self._link_from_dict(link) for link in value]
849
                )
850
851 1
        return data
852
853 1
    def _evc_from_dict(self, evc_dict):
854 1
        data = self._evc_dict_with_instances(evc_dict)
855 1
        return EVC(self.controller, **data)
856
857 1
    def _uni_from_dict(self, uni_dict):
858
        """Return a UNI object from python dict."""
859 1
        if uni_dict is None:
860 1
            return False
861
862 1
        interface_id = uni_dict.get("interface_id")
863 1
        interface = self.controller.get_interface_by_id(interface_id)
864 1
        if interface is None:
865 1
            result = (
866
                "Error creating UNI:"
867
                + f"Could not instantiate interface {interface_id}"
868
            )
869 1
            raise ValueError(result) from ValueError
870
871 1
        tag_dict = uni_dict.get("tag", None)
872 1
        if tag_dict:
873 1
            tag = TAG.from_dict(tag_dict)
874
        else:
875 1
            tag = None
876 1
        uni = UNI(interface, tag)
877
878 1
        return uni
879
880 1
    def _link_from_dict(self, link_dict):
881
        """Return a Link object from python dict."""
882 1
        id_a = link_dict.get("endpoint_a").get("id")
883 1
        id_b = link_dict.get("endpoint_b").get("id")
884
885 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
886 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
887 1
        if not endpoint_a:
888 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
889 1
            raise ValueError(error_msg)
890 1
        if not endpoint_b:
891
            error_msg = f"Could not get interface endpoint_b id {id_b}"
892
            raise ValueError(error_msg)
893
894 1
        link = Link(endpoint_a, endpoint_b)
895 1
        if "metadata" in link_dict:
896 1
            link.extend_metadata(link_dict.get("metadata"))
897
898 1
        s_vlan = link.get_metadata("s_vlan")
899 1
        if s_vlan:
900 1
            tag = TAG.from_dict(s_vlan)
901 1
            if tag is False:
902
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
903
                raise ValueError(error_msg)
904 1
            link.update_metadata("s_vlan", tag)
905 1
        return link
906
907 1
    def _find_evc_by_schedule_id(self, schedule_id):
908
        """
909
        Find an EVC and CircuitSchedule based on schedule_id.
910
911
        :param schedule_id: Schedule ID
912
        :return: EVC and Schedule
913
        """
914 1
        circuits = self._get_circuits_buffer()
915 1
        found_schedule = None
916 1
        evc = None
917
918
        # pylint: disable=unused-variable
919 1
        for c_id, circuit in circuits.items():
920 1
            for schedule in circuit.circuit_scheduler:
921 1
                if schedule.id == schedule_id:
922 1
                    found_schedule = schedule
923 1
                    evc = circuit
924 1
                    break
925 1
            if found_schedule:
926 1
                break
927 1
        return evc, found_schedule
928
929 1
    def _get_circuits_buffer(self):
930
        """
931
        Return the circuit buffer.
932
933
        If the buffer is empty, try to load data from mongodb.
934
        """
935 1
        if not self.circuits:
936
            # Load circuits from mongodb to buffer
937 1
            circuits = self.mongo_controller.get_circuits()['circuits']
938 1
            for c_id, circuit in circuits.items():
939 1
                evc = self._evc_from_dict(circuit)
940 1
                self.circuits[c_id] = evc
941 1
        return self.circuits
942
943 1
    @staticmethod
944 1
    def _json_from_request(caller):
945
        """Return a json from request.
946
947
        If it was not possible to get a json from the request, log, for debug,
948
        who was the caller and the error that ocurred, and raise an
949
        Exception.
950
        """
951 1
        try:
952 1
            json_data = request.get_json()
953 1
        except ValueError as exception:
954
            log.error(exception)
955
            log.debug(f"{caller} result {exception} 400")
956
            raise BadRequest(str(exception)) from BadRequest
957 1
        except BadRequest:
958 1
            result = "The request is not a valid JSON."
959 1
            log.debug(f"{caller} result {result} 400")
960 1
            raise BadRequest(result) from BadRequest
961 1
        if json_data is None:
962 1
            result = "Content-Type must be application/json"
963 1
            log.debug(f"{caller} result {result} 415")
964 1
            raise UnsupportedMediaType(result)
965
        return json_data
966