Passed
Pull Request — master (#258)
by
unknown
03:54
created

build.main.Main.update()   F

Complexity

Conditions 14

Size

Total Lines 60
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 39
CRAP Score 14.9663

Importance

Changes 0
Metric Value
cc 14
eloc 49
nop 2
dl 0
loc 60
ccs 39
cts 47
cp 0.8298
crap 14.9663
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    @staticmethod
87 1
    def should_be_checked(circuit):
88
        "Verify if the circuit meets the necessary conditions to be checked"
89
        # pylint: disable=too-many-boolean-expressions
90 1
        if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
                # if a inter-switch EVC does not have current_path, it does not
97
                # make sense to run sdntrace on it
98
                and (circuit.is_intra_switch() or circuit.current_path)
99
                ):
100 1
            return True
101
        return False
102
103 1
    def execute_consistency(self):
104
        """Execute consistency routine."""
105 1
        circuits_to_check = []
106 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
107 1
        for circuit in self.get_evcs_by_svc_level():
108 1
            stored_circuits.pop(circuit.id, None)
109 1
            if self.should_be_checked(circuit):
110 1
                circuits_to_check.append(circuit)
111 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
112 1
        for circuit in circuits_to_check:
113 1
            is_checked = circuits_checked.get(circuit.id)
114 1
            if is_checked:
115 1
                circuit.execution_rounds = 0
116 1
                log.info(f"{circuit} enabled but inactive - activating")
117 1
                with circuit.lock:
118 1
                    circuit.activate()
119 1
                    circuit.sync()
120
            else:
121 1
                circuit.execution_rounds += 1
122 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
123 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
124 1
                    with circuit.lock:
125 1
                        circuit.deploy()
126 1
        for circuit_id in stored_circuits:
127 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
128 1
            self._load_evc(stored_circuits[circuit_id])
129
130 1
    def shutdown(self):
131
        """Execute when your napp is unloaded.
132
133
        If you have some cleanup procedure, insert it here.
134
        """
135
136 1
    @rest("/v2/evc/", methods=["GET"])
137 1
    def list_circuits(self):
138
        """Endpoint to return circuits stored.
139
140
        archive query arg if defined (not null) will be filtered
141
        accordingly, by default only non archived evcs will be listed
142
        """
143 1
        log.debug("list_circuits /v2/evc")
144 1
        archived = request.args.get("archived", "false").lower()
145 1
        archived_to_optional = {"null": None, "true": True, "false": False}
146 1
        archived = archived_to_optional.get(archived, False)
147 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
148 1
        circuits = circuits['circuits']
149 1
        return jsonify(circuits), 200
150
151 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
152 1
    def get_circuit(self, circuit_id):
153
        """Endpoint to return a circuit based on id."""
154 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
155 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
156 1
        if not circuit:
157 1
            result = f"circuit_id {circuit_id} not found"
158 1
            log.debug("get_circuit result %s %s", result, 404)
159 1
            raise NotFound(result)
160 1
        status = 200
161 1
        log.debug("get_circuit result %s %s", circuit, status)
162 1
        return jsonify(circuit), status
163
164 1
    @rest("/v2/evc/", methods=["POST"])
165 1
    @validate(spec)
166 1
    def create_circuit(self, data):
167
        """Try to create a new circuit.
168
169
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
170
        UNI_Z's requested C-VID are available from the interfaces' pools. This
171
        is checked when creating the UNI object.
172
173
        Then, E-Line NApp requests a primary and a backup path to the
174
        Pathfinder NApp using the attributes primary_links and backup_links
175
        submitted via REST
176
177
        # For each link composing paths in #3:
178
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
179
        #  - Using the S-VID obtained, generate abstract flow entries to be
180
        #    sent to FlowManager
181
182
        Push abstract flow entries to FlowManager and FlowManager pushes
183
        OpenFlow entries to datapaths
184
185
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
186
        creation
187
188
        Finnaly, notify user of the status of its request.
189
        """
190
        # Try to create the circuit object
191 1
        log.debug("create_circuit /v2/evc/")
192
193 1
        try:
194 1
            evc = self._evc_from_dict(data)
195 1
        except ValueError as exception:
196 1
            log.debug("create_circuit result %s %s", exception, 400)
197 1
            raise BadRequest(str(exception)) from BadRequest
198
199 1
        if evc.primary_path:
200
            try:
201
                evc.primary_path.is_valid(
202
                    evc.uni_a.interface.switch,
203
                    evc.uni_z.interface.switch,
204
                    bool(evc.circuit_scheduler),
205
                )
206
            except InvalidPath as exception:
207
                raise BadRequest(
208
                    f"primary_path is not valid: {exception}"
209
                ) from exception
210 1
        if evc.backup_path:
211
            try:
212
                evc.backup_path.is_valid(
213
                    evc.uni_a.interface.switch,
214
                    evc.uni_z.interface.switch,
215
                    bool(evc.circuit_scheduler),
216
                )
217
            except InvalidPath as exception:
218
                raise BadRequest(
219
                    f"backup_path is not valid: {exception}"
220
                ) from exception
221
222
        # verify duplicated evc
223 1
        if self._is_duplicated_evc(evc):
224 1
            result = "The EVC already exists."
225 1
            log.debug("create_circuit result %s %s", result, 409)
226 1
            raise Conflict(result)
227
228 1
        try:
229 1
            evc._validate_has_primary_or_dynamic()
230 1
        except ValueError as exception:
231 1
            raise BadRequest(str(exception)) from exception
232
233
        # save circuit
234 1
        try:
235 1
            evc.sync()
236
        except ValidationError as exception:
237
            raise BadRequest(str(exception)) from exception
238
239
        # store circuit in dictionary
240 1
        self.circuits[evc.id] = evc
241
242
        # Schedule the circuit deploy
243 1
        self.sched.add(evc)
244
245
        # Circuit has no schedule, deploy now
246 1
        if not evc.circuit_scheduler:
247 1
            with evc.lock:
248 1
                evc.deploy()
249
250
        # Notify users
251 1
        result = {"circuit_id": evc.id}
252 1
        status = 201
253 1
        log.debug("create_circuit result %s %s", result, status)
254 1
        emit_event(self.controller, name="created",
255
                   content=map_evc_event_content(evc))
256 1
        return jsonify(result), status
257
258 1
    @listen_to('kytos/flow_manager.flow.removed')
259 1
    def on_flow_delete(self, event):
260
        """Capture delete messages to keep track when flows got removed."""
261
        self.handle_flow_delete(event)
262
263 1
    def handle_flow_delete(self, event):
264
        """Keep track when the EVC got flows removed by deriving its cookie."""
265 1
        flow = event.content["flow"]
266 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
267 1
        if evc:
268 1
            log.debug("Flow removed in EVC %s", evc.id)
269 1
            evc.set_flow_removed_at()
270
271 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
272 1
    def update(self, circuit_id):
273
        """Update a circuit based on payload.
274
275
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
276
        """
277 1
        log.debug("update /v2/evc/%s", circuit_id)
278 1
        try:
279 1
            evc = self.circuits[circuit_id]
280 1
        except KeyError:
281 1
            result = f"circuit_id {circuit_id} not found"
282 1
            log.debug("update result %s %s", result, 404)
283 1
            raise NotFound(result) from NotFound
284
285 1
        if evc.archived:
286 1
            result = "Can't update archived EVC"
287 1
            log.debug("update result %s %s", result, 405)
288 1
            raise MethodNotAllowed(["GET"], result)
289
290 1
        try:
291 1
            data = request.get_json()
292 1
        except BadRequest:
293 1
            result = "The request body is not a well-formed JSON."
294 1
            log.debug("update result %s %s", result, 400)
295 1
            raise BadRequest(result) from BadRequest
296 1
        if data is None:
297 1
            result = "The request body mimetype is not application/json."
298 1
            log.debug("update result %s %s", result, 415)
299 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
300
301 1
        try:
302 1
            enable, redeploy = evc.update(
303
                **self._evc_dict_with_instances(data)
304
            )
305 1
        except ValidationError as exception:
306
            raise BadRequest(str(exception)) from exception
307 1
        except ValueError as exception:
308 1
            log.error(exception)
309 1
            log.debug("update result %s %s", exception, 400)
310 1
            raise BadRequest(str(exception)) from BadRequest
311
312 1
        if evc.is_active():
313
            if enable is False:  # disable if active
314
                with evc.lock:
315
                    evc.remove()
316
            elif redeploy is not None:  # redeploy if active
317
                with evc.lock:
318
                    evc.remove()
319
                    evc.deploy()
320
        else:
321 1
            if enable is True:  # enable if inactive
322 1
                with evc.lock:
323 1
                    evc.deploy()
324 1
        result = {evc.id: evc.as_dict()}
325 1
        status = 200
326
327 1
        log.debug("update result %s %s", result, status)
328 1
        emit_event(self.controller, "updated",
329
                   content=map_evc_event_content(evc, **data))
330 1
        return jsonify(result), status
331
332 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
333 1
    def delete_circuit(self, circuit_id):
334
        """Remove a circuit.
335
336
        First, the flows are removed from the switches, and then the EVC is
337
        disabled.
338
        """
339 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
340 1
        try:
341 1
            evc = self.circuits[circuit_id]
342 1
        except KeyError:
343 1
            result = f"circuit_id {circuit_id} not found"
344 1
            log.debug("delete_circuit result %s %s", result, 404)
345 1
            raise NotFound(result) from NotFound
346
347 1
        if evc.archived:
348 1
            result = f"Circuit {circuit_id} already removed"
349 1
            log.debug("delete_circuit result %s %s", result, 404)
350 1
            raise NotFound(result) from NotFound
351
352 1
        log.info("Removing %s", evc)
353 1
        with evc.lock:
354 1
            evc.remove_current_flows()
355 1
            evc.remove_failover_flows(sync=False)
356 1
            evc.deactivate()
357 1
            evc.disable()
358 1
            self.sched.remove(evc)
359 1
            evc.archive()
360 1
            evc.sync()
361 1
        log.info("EVC removed. %s", evc)
362 1
        result = {"response": f"Circuit {circuit_id} removed"}
363 1
        status = 200
364
365 1
        log.debug("delete_circuit result %s %s", result, status)
366 1
        emit_event(self.controller, "deleted",
367
                   content=map_evc_event_content(evc))
368 1
        return jsonify(result), status
369
370 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
371 1
    def get_metadata(self, circuit_id):
372
        """Get metadata from an EVC."""
373 1
        try:
374 1
            return (
375
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
376
                200,
377
            )
378
        except KeyError as error:
379
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
380
381 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
382 1
    def add_metadata(self, circuit_id):
383
        """Add metadata to an EVC."""
384 1
        try:
385 1
            metadata = request.get_json()
386 1
            content_type = request.content_type
387 1
        except BadRequest as error:
388 1
            result = "The request body is not a well-formed JSON."
389 1
            raise BadRequest(result) from error
390 1
        if content_type is None:
391 1
            result = "The request body is empty."
392 1
            raise BadRequest(result)
393 1
        if metadata is None:
394 1
            if content_type != "application/json":
395 1
                result = (
396
                    "The content type must be application/json "
397
                    f"(received {content_type})."
398
                )
399
            else:
400
                result = "Metadata is empty."
401 1
            raise UnsupportedMediaType(result)
402
403 1
        try:
404 1
            evc = self.circuits[circuit_id]
405 1
        except KeyError as error:
406 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
407
408 1
        evc.extend_metadata(metadata)
409 1
        evc.sync()
410 1
        return jsonify("Operation successful"), 201
411
412 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
413 1
    def delete_metadata(self, circuit_id, key):
414
        """Delete metadata from an EVC."""
415 1
        try:
416 1
            evc = self.circuits[circuit_id]
417 1
        except KeyError as error:
418 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
419
420 1
        evc.remove_metadata(key)
421 1
        evc.sync()
422 1
        return jsonify("Operation successful"), 200
423
424 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
425 1
    def redeploy(self, circuit_id):
426
        """Endpoint to force the redeployment of an EVC."""
427 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
428 1
        try:
429 1
            evc = self.circuits[circuit_id]
430 1
        except KeyError:
431 1
            result = f"circuit_id {circuit_id} not found"
432 1
            raise NotFound(result) from NotFound
433 1
        if evc.is_enabled():
434 1
            with evc.lock:
435 1
                evc.remove_current_flows()
436 1
                evc.deploy()
437 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
438 1
            status = 202
439
        else:
440 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
441 1
            status = 409
442
443 1
        return jsonify(result), status
444
445 1
    @rest("/v2/evc/schedule", methods=["GET"])
446 1
    def list_schedules(self):
447
        """Endpoint to return all schedules stored for all circuits.
448
449
        Return a JSON with the following template:
450
        [{"schedule_id": <schedule_id>,
451
         "circuit_id": <circuit_id>,
452
         "schedule": <schedule object>}]
453
        """
454 1
        log.debug("list_schedules /v2/evc/schedule")
455 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
456 1
        if not circuits:
457 1
            result = {}
458 1
            status = 200
459 1
            return jsonify(result), status
460
461 1
        result = []
462 1
        status = 200
463 1
        for circuit in circuits:
464 1
            circuit_scheduler = circuit.get("circuit_scheduler")
465 1
            if circuit_scheduler:
466 1
                for scheduler in circuit_scheduler:
467 1
                    value = {
468
                        "schedule_id": scheduler.get("id"),
469
                        "circuit_id": circuit.get("id"),
470
                        "schedule": scheduler,
471
                    }
472 1
                    result.append(value)
473
474 1
        log.debug("list_schedules result %s %s", result, status)
475 1
        return jsonify(result), status
476
477 1
    @rest("/v2/evc/schedule/", methods=["POST"])
478 1
    def create_schedule(self):
479
        """
480
        Create a new schedule for a given circuit.
481
482
        This service do no check if there are conflicts with another schedule.
483
        Payload example:
484
            {
485
              "circuit_id":"aa:bb:cc",
486
              "schedule": {
487
                "date": "2019-08-07T14:52:10.967Z",
488
                "interval": "string",
489
                "frequency": "1 * * * *",
490
                "action": "create"
491
              }
492
            }
493
        """
494 1
        log.debug("create_schedule /v2/evc/schedule/")
495
496 1
        json_data = self._json_from_request("create_schedule")
497 1
        try:
498 1
            circuit_id = json_data["circuit_id"]
499 1
        except TypeError:
500 1
            result = "The payload should have a dictionary."
501 1
            log.debug("create_schedule result %s %s", result, 400)
502 1
            raise BadRequest(result) from BadRequest
503 1
        except KeyError:
504 1
            result = "Missing circuit_id."
505 1
            log.debug("create_schedule result %s %s", result, 400)
506 1
            raise BadRequest(result) from BadRequest
507
508 1
        try:
509 1
            schedule_data = json_data["schedule"]
510 1
        except KeyError:
511 1
            result = "Missing schedule data."
512 1
            log.debug("create_schedule result %s %s", result, 400)
513 1
            raise BadRequest(result) from BadRequest
514
515
        # Get EVC from circuits buffer
516 1
        circuits = self._get_circuits_buffer()
517
518
        # get the circuit
519 1
        evc = circuits.get(circuit_id)
520
521
        # get the circuit
522 1
        if not evc:
523 1
            result = f"circuit_id {circuit_id} not found"
524 1
            log.debug("create_schedule result %s %s", result, 404)
525 1
            raise NotFound(result) from NotFound
526
        # Can not modify circuits deleted and archived
527 1
        if evc.archived:
528 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
529 1
            log.debug("create_schedule result %s %s", result, 403)
530 1
            raise Forbidden(result) from Forbidden
531
532
        # new schedule from dict
533 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
534
535
        # If there is no schedule, create the list
536 1
        if not evc.circuit_scheduler:
537 1
            evc.circuit_scheduler = []
538
539
        # Add the new schedule
540 1
        evc.circuit_scheduler.append(new_schedule)
541
542
        # Add schedule job
543 1
        self.sched.add_circuit_job(evc, new_schedule)
544
545
        # save circuit to mongodb
546 1
        evc.sync()
547
548 1
        result = new_schedule.as_dict()
549 1
        status = 201
550
551 1
        log.debug("create_schedule result %s %s", result, status)
552 1
        return jsonify(result), status
553
554 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
555 1
    def update_schedule(self, schedule_id):
556
        """Update a schedule.
557
558
        Change all attributes from the given schedule from a EVC circuit.
559
        The schedule ID is preserved as default.
560
        Payload example:
561
            {
562
              "date": "2019-08-07T14:52:10.967Z",
563
              "interval": "string",
564
              "frequency": "1 * * *",
565
              "action": "create"
566
            }
567
        """
568 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
569
570
        # Try to find a circuit schedule
571 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
572
573
        # Can not modify circuits deleted and archived
574 1
        if not found_schedule:
575 1
            result = f"schedule_id {schedule_id} not found"
576 1
            log.debug("update_schedule result %s %s", result, 404)
577 1
            raise NotFound(result) from NotFound
578 1
        if evc.archived:
579 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
580 1
            log.debug("update_schedule result %s %s", result, 403)
581 1
            raise Forbidden(result) from Forbidden
582
583 1
        data = self._json_from_request("update_schedule")
584
585 1
        new_schedule = CircuitSchedule.from_dict(data)
586 1
        new_schedule.id = found_schedule.id
587
        # Remove the old schedule
588 1
        evc.circuit_scheduler.remove(found_schedule)
589
        # Append the modified schedule
590 1
        evc.circuit_scheduler.append(new_schedule)
591
592
        # Cancel all schedule jobs
593 1
        self.sched.cancel_job(found_schedule.id)
594
        # Add the new circuit schedule
595 1
        self.sched.add_circuit_job(evc, new_schedule)
596
        # Save EVC to mongodb
597 1
        evc.sync()
598
599 1
        result = new_schedule.as_dict()
600 1
        status = 200
601
602 1
        log.debug("update_schedule result %s %s", result, status)
603 1
        return jsonify(result), status
604
605 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
606 1
    def delete_schedule(self, schedule_id):
607
        """Remove a circuit schedule.
608
609
        Remove the Schedule from EVC.
610
        Remove the Schedule from cron job.
611
        Save the EVC to the Storehouse.
612
        """
613 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
614 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
615
616
        # Can not modify circuits deleted and archived
617 1
        if not found_schedule:
618 1
            result = f"schedule_id {schedule_id} not found"
619 1
            log.debug("delete_schedule result %s %s", result, 404)
620 1
            raise NotFound(result)
621
622 1
        if evc.archived:
623 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
624 1
            log.debug("delete_schedule result %s %s", result, 403)
625 1
            raise Forbidden(result)
626
627
        # Remove the old schedule
628 1
        evc.circuit_scheduler.remove(found_schedule)
629
630
        # Cancel all schedule jobs
631 1
        self.sched.cancel_job(found_schedule.id)
632
        # Save EVC to mongodb
633 1
        evc.sync()
634
635 1
        result = "Schedule removed"
636 1
        status = 200
637
638 1
        log.debug("delete_schedule result %s %s", result, status)
639 1
        return jsonify(result), status
640
641 1
    def _is_duplicated_evc(self, evc):
642
        """Verify if the circuit given is duplicated with the stored evcs.
643
644
        Args:
645
            evc (EVC): circuit to be analysed.
646
647
        Returns:
648
            boolean: True if the circuit is duplicated, otherwise False.
649
650
        """
651 1
        for circuit in tuple(self.circuits.values()):
652 1
            if not circuit.archived and circuit.shares_uni(evc):
653 1
                return True
654 1
        return False
655
656 1
    @listen_to("kytos/topology.link_up")
657 1
    def on_link_up(self, event):
658
        """Change circuit when link is up or end_maintenance."""
659
        self.handle_link_up(event)
660
661 1
    def handle_link_up(self, event):
662
        """Change circuit when link is up or end_maintenance."""
663 1
        log.info("Event handle_link_up %s", event.content["link"])
664 1
        for evc in self.get_evcs_by_svc_level():
665 1
            if evc.is_enabled() and not evc.archived:
666 1
                with evc.lock:
667 1
                    evc.handle_link_up(event.content["link"])
668
669 1
    @listen_to("kytos/topology.link_down")
670 1
    def on_link_down(self, event):
671
        """Change circuit when link is down or under_mantenance."""
672
        self.handle_link_down(event)
673
674 1
    def handle_link_down(self, event):
675
        """Change circuit when link is down or under_mantenance."""
676 1
        link = event.content["link"]
677 1
        log.info("Event handle_link_down %s", link)
678 1
        switch_flows = {}
679 1
        evcs_with_failover = []
680 1
        evcs_normal = []
681 1
        check_failover = []
682 1
        for evc in self.get_evcs_by_svc_level():
683 1
            if evc.is_affected_by_link(link):
684
                # if there is no failover path, handles link down the
685
                # tradditional way
686 1
                if (
687
                    not getattr(evc, 'failover_path', None) or
688
                    evc.is_failover_path_affected_by_link(link)
689
                ):
690 1
                    evcs_normal.append(evc)
691 1
                    continue
692 1
                for dpid, flows in evc.get_failover_flows().items():
693 1
                    switch_flows.setdefault(dpid, [])
694 1
                    switch_flows[dpid].extend(flows)
695 1
                evcs_with_failover.append(evc)
696
            else:
697 1
                check_failover.append(evc)
698
699 1
        offset = 0
700 1
        while switch_flows:
701 1
            offset = (offset + settings.BATCH_SIZE) or None
702 1
            switches = list(switch_flows.keys())
703 1
            for dpid in switches:
704 1
                emit_event(
705
                    self.controller,
706
                    context="kytos.flow_manager",
707
                    name="flows.install",
708
                    content={
709
                        "dpid": dpid,
710
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
711
                    }
712
                )
713 1
                if offset is None or offset >= len(switch_flows[dpid]):
714 1
                    del switch_flows[dpid]
715 1
                    continue
716 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
717 1
            time.sleep(settings.BATCH_INTERVAL)
718
719 1
        for evc in evcs_with_failover:
720 1
            with evc.lock:
721 1
                old_path = evc.current_path
722 1
                evc.current_path = evc.failover_path
723 1
                evc.failover_path = old_path
724 1
                evc.sync()
725 1
            emit_event(self.controller, "redeployed_link_down",
726
                       content=map_evc_event_content(evc))
727 1
            log.info(
728
                f"{evc} redeployed with failover due to link down {link.id}"
729
            )
730
731 1
        for evc in evcs_normal:
732 1
            emit_event(
733
                self.controller,
734
                "evc_affected_by_link_down",
735
                content={"link_id": link.id} | map_evc_event_content(evc)
736
            )
737
738
        # After handling the hot path, check if new failover paths are needed.
739
        # Note that EVCs affected by link down will generate a KytosEvent for
740
        # deployed|redeployed, which will trigger the failover path setup.
741
        # Thus, we just need to further check the check_failover list
742 1
        for evc in check_failover:
743 1
            if evc.is_failover_path_affected_by_link(link):
744 1
                evc.setup_failover_path()
745
746 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
747 1
    def on_evc_affected_by_link_down(self, event):
748
        """Change circuit when link is down or under_mantenance."""
749
        self.handle_evc_affected_by_link_down(event)
750
751 1
    def handle_evc_affected_by_link_down(self, event):
752
        """Change circuit when link is down or under_mantenance."""
753 1
        evc = self.circuits.get(event.content["evc_id"])
754 1
        link_id = event.content['link_id']
755 1
        if not evc:
756 1
            return
757 1
        with evc.lock:
758 1
            result = evc.handle_link_down()
759 1
        event_name = "error_redeploy_link_down"
760 1
        if result:
761 1
            log.info(f"{evc} redeployed due to link down {link_id}")
762 1
            event_name = "redeployed_link_down"
763 1
        emit_event(self.controller, event_name,
764
                   content=map_evc_event_content(evc))
765
766 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
767 1
    def on_evc_deployed(self, event):
768
        """Handle EVC deployed|redeployed_link_down."""
769
        self.handle_evc_deployed(event)
770
771 1
    def handle_evc_deployed(self, event):
772
        """Setup failover path on evc deployed."""
773 1
        evc = self.circuits.get(event.content["evc_id"])
774 1
        if not evc:
775 1
            return
776 1
        with evc.lock:
777 1
            evc.setup_failover_path()
778
779 1
    @listen_to("kytos/topology.topology_loaded")
780 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
781
        """Load EVCs once the topology is available."""
782
        self.load_all_evcs()
783
784 1
    def load_all_evcs(self):
785
        """Try to load all EVCs on startup."""
786 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
787 1
        for circuit_id, circuit in circuits:
788 1
            if circuit_id not in self.circuits:
789 1
                self._load_evc(circuit)
790
791 1
    def _load_evc(self, circuit_dict):
792
        """Load one EVC from mongodb to memory."""
793 1
        try:
794 1
            evc = self._evc_from_dict(circuit_dict)
795 1
        except ValueError as exception:
796 1
            log.error(
797
                f"Could not load EVC: dict={circuit_dict} error={exception}"
798
            )
799 1
            return None
800
801 1
        if evc.archived:
802 1
            return None
803 1
        evc.deactivate()
804 1
        evc.sync()
805 1
        self.circuits.setdefault(evc.id, evc)
806 1
        self.sched.add(evc)
807 1
        return evc
808
809 1
    @listen_to("kytos/flow_manager.flow.error")
810 1
    def on_flow_mod_error(self, event):
811
        """Handle flow mod errors related to an EVC."""
812
        self.handle_flow_mod_error(event)
813
814 1
    def handle_flow_mod_error(self, event):
815
        """Handle flow mod errors related to an EVC."""
816 1
        flow = event.content["flow"]
817 1
        command = event.content.get("error_command")
818 1
        if command != "add":
819
            return
820 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
821 1
        if evc:
822 1
            evc.remove_current_flows()
823
824 1
    def _evc_dict_with_instances(self, evc_dict):
825
        """Convert some dict values to instance of EVC classes.
826
827
        This method will convert: [UNI, Link]
828
        """
829 1
        data = evc_dict.copy()  # Do not modify the original dict
830 1
        for attribute, value in data.items():
831
            # Get multiple attributes.
832
            # Ex: uni_a, uni_z
833 1
            if "uni" in attribute:
834 1
                try:
835 1
                    data[attribute] = self._uni_from_dict(value)
836 1
                except ValueError as exception:
837 1
                    result = "Error creating UNI: Invalid value"
838 1
                    raise ValueError(result) from exception
839
840 1
            if attribute == "circuit_scheduler":
841 1
                data[attribute] = []
842 1
                for schedule in value:
843 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
844
845
            # Get multiple attributes.
846
            # Ex: primary_links,
847
            #     backup_links,
848
            #     current_links_cache,
849
            #     primary_links_cache,
850
            #     backup_links_cache
851 1
            if "links" in attribute:
852 1
                data[attribute] = [
853
                    self._link_from_dict(link) for link in value
854
                ]
855
856
            # Ex: current_path,
857
            #     primary_path,
858
            #     backup_path
859 1
            if "path" in attribute and attribute != "dynamic_backup_path":
860 1
                data[attribute] = Path(
861
                    [self._link_from_dict(link) for link in value]
862
                )
863
864 1
        return data
865
866 1
    def _evc_from_dict(self, evc_dict):
867 1
        data = self._evc_dict_with_instances(evc_dict)
868 1
        return EVC(self.controller, **data)
869
870 1
    def _uni_from_dict(self, uni_dict):
871
        """Return a UNI object from python dict."""
872 1
        if uni_dict is None:
873 1
            return False
874
875 1
        interface_id = uni_dict.get("interface_id")
876 1
        interface = self.controller.get_interface_by_id(interface_id)
877 1
        if interface is None:
878 1
            result = (
879
                "Error creating UNI:"
880
                + f"Could not instantiate interface {interface_id}"
881
            )
882 1
            raise ValueError(result) from ValueError
883
884 1
        tag_dict = uni_dict.get("tag", None)
885 1
        if tag_dict:
886 1
            tag = TAG.from_dict(tag_dict)
887
        else:
888 1
            tag = None
889 1
        uni = UNI(interface, tag)
890
891 1
        return uni
892
893 1
    def _link_from_dict(self, link_dict):
894
        """Return a Link object from python dict."""
895 1
        id_a = link_dict.get("endpoint_a").get("id")
896 1
        id_b = link_dict.get("endpoint_b").get("id")
897
898 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
899 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
900 1
        if not endpoint_a:
901 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
902 1
            raise ValueError(error_msg)
903 1
        if not endpoint_b:
904
            error_msg = f"Could not get interface endpoint_b id {id_b}"
905
            raise ValueError(error_msg)
906
907 1
        link = Link(endpoint_a, endpoint_b)
908 1
        if "metadata" in link_dict:
909 1
            link.extend_metadata(link_dict.get("metadata"))
910
911 1
        s_vlan = link.get_metadata("s_vlan")
912 1
        if s_vlan:
913 1
            tag = TAG.from_dict(s_vlan)
914 1
            if tag is False:
915
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
916
                raise ValueError(error_msg)
917 1
            link.update_metadata("s_vlan", tag)
918 1
        return link
919
920 1
    def _find_evc_by_schedule_id(self, schedule_id):
921
        """
922
        Find an EVC and CircuitSchedule based on schedule_id.
923
924
        :param schedule_id: Schedule ID
925
        :return: EVC and Schedule
926
        """
927 1
        circuits = self._get_circuits_buffer()
928 1
        found_schedule = None
929 1
        evc = None
930
931
        # pylint: disable=unused-variable
932 1
        for c_id, circuit in circuits.items():
933 1
            for schedule in circuit.circuit_scheduler:
934 1
                if schedule.id == schedule_id:
935 1
                    found_schedule = schedule
936 1
                    evc = circuit
937 1
                    break
938 1
            if found_schedule:
939 1
                break
940 1
        return evc, found_schedule
941
942 1
    def _get_circuits_buffer(self):
943
        """
944
        Return the circuit buffer.
945
946
        If the buffer is empty, try to load data from mongodb.
947
        """
948 1
        if not self.circuits:
949
            # Load circuits from mongodb to buffer
950 1
            circuits = self.mongo_controller.get_circuits()['circuits']
951 1
            for c_id, circuit in circuits.items():
952 1
                evc = self._evc_from_dict(circuit)
953 1
                self.circuits[c_id] = evc
954 1
        return self.circuits
955
956 1
    @staticmethod
957 1
    def _json_from_request(caller):
958
        """Return a json from request.
959
960
        If it was not possible to get a json from the request, log, for debug,
961
        who was the caller and the error that ocurred, and raise an
962
        Exception.
963
        """
964 1
        try:
965 1
            json_data = request.get_json()
966 1
        except ValueError as exception:
967
            log.error(exception)
968
            log.debug(f"{caller} result {exception} 400")
969
            raise BadRequest(str(exception)) from BadRequest
970 1
        except BadRequest:
971 1
            result = "The request is not a valid JSON."
972 1
            log.debug(f"{caller} result {result} 400")
973 1
            raise BadRequest(result) from BadRequest
974 1
        if json_data is None:
975 1
            result = "Content-Type must be application/json"
976 1
            log.debug(f"{caller} result {result} 415")
977 1
            raise UnsupportedMediaType(result)
978
        return json_data
979