Passed
Push — master ( f1cfb0...17a0f1 )
by Vinicius
05:41 queued 03:32
created

build.main.Main.create_schedule()   B

Complexity

Conditions 7

Size

Total Lines 76
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 40
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 40
nop 1
dl 0
loc 76
ccs 40
cts 40
cp 1
crap 7
rs 7.52
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    @staticmethod
64 1
    def get_eline_controller():
65
        """Return the ELineController instance."""
66
        return controllers.ELineController()
67
68 1
    def execute(self):
69
        """Execute once when the napp is running."""
70 1
        if self.execution_rounds < 0:
71
            self.execution_rounds += 1
72
            return
73 1
        if self._lock.locked():
74 1
            return
75 1
        log.debug("Starting consistency routine")
76 1
        with self._lock:
77 1
            self.execute_consistency()
78 1
        log.debug("Finished consistency routine")
79
80 1
    def execute_consistency(self):
81
        """Execute consistency routine."""
82 1
        self.execution_rounds += 1
83 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
84 1
        for circuit in tuple(self.circuits.values()):
85 1
            stored_circuits.pop(circuit.id, None)
86 1
            if (
87
                circuit.is_enabled()
88
                and not circuit.is_active()
89
                and not circuit.lock.locked()
90
            ):
91 1
                if circuit.check_traces():
92 1
                    log.info(f"{circuit} enabled but inactive - activating")
93 1
                    with circuit.lock:
94 1
                        circuit.activate()
95 1
                        circuit.sync()
96
                else:
97 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
98 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
99 1
                        with circuit.lock:
100 1
                            circuit.deploy()
101 1
        for circuit_id in stored_circuits:
102 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
103 1
            self._load_evc(stored_circuits[circuit_id])
104
105 1
    def shutdown(self):
106
        """Execute when your napp is unloaded.
107
108
        If you have some cleanup procedure, insert it here.
109
        """
110
111 1
    @rest("/v2/evc/", methods=["GET"])
112 1
    def list_circuits(self):
113
        """Endpoint to return circuits stored.
114
115
        If archived is set to True return all circuits, else only the ones
116
        not archived.
117
        """
118 1
        log.debug("list_circuits /v2/evc")
119 1
        archived = request.args.get("archived", False)
120 1
        circuits = self.mongo_controller.get_circuits()['circuits']
121 1
        if not circuits:
122 1
            return jsonify({}), 200
123 1
        if archived:
124 1
            return jsonify(circuits), 200
125 1
        return (
126
            jsonify(
127
                {
128
                    circuit_id: circuit
129
                    for circuit_id, circuit in circuits.items()
130
                    if not circuit.get("archived", False)
131
                }
132
            ),
133
            200,
134
        )
135
136 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
137 1
    def get_circuit(self, circuit_id):
138
        """Endpoint to return a circuit based on id."""
139 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
140 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
141 1
        if not circuit:
142 1
            result = f"circuit_id {circuit_id} not found"
143 1
            log.debug("get_circuit result %s %s", result, 404)
144 1
            raise NotFound(result)
145 1
        status = 200
146 1
        log.debug("get_circuit result %s %s", circuit, status)
147 1
        return jsonify(circuit), status
148
149 1
    @rest("/v2/evc/", methods=["POST"])
150 1
    @validate(spec)
151 1
    def create_circuit(self, data):
152
        """Try to create a new circuit.
153
154
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
155
        UNI_Z's requested C-VID are available from the interfaces' pools. This
156
        is checked when creating the UNI object.
157
158
        Then, E-Line NApp requests a primary and a backup path to the
159
        Pathfinder NApp using the attributes primary_links and backup_links
160
        submitted via REST
161
162
        # For each link composing paths in #3:
163
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
164
        #  - Using the S-VID obtained, generate abstract flow entries to be
165
        #    sent to FlowManager
166
167
        Push abstract flow entries to FlowManager and FlowManager pushes
168
        OpenFlow entries to datapaths
169
170
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
171
        creation
172
173
        Finnaly, notify user of the status of its request.
174
        """
175
        # Try to create the circuit object
176 1
        log.debug("create_circuit /v2/evc/")
177
178 1
        try:
179 1
            evc = self._evc_from_dict(data)
180 1
        except ValueError as exception:
181 1
            log.debug("create_circuit result %s %s", exception, 400)
182 1
            raise BadRequest(str(exception)) from BadRequest
183
184 1
        if evc.primary_path:
185
            try:
186
                evc.primary_path.is_valid(
187
                    evc.uni_a.interface.switch,
188
                    evc.uni_z.interface.switch,
189
                    bool(evc.circuit_scheduler),
190
                )
191
            except InvalidPath as exception:
192
                raise BadRequest(
193
                    f"primary_path is not valid: {exception}"
194
                ) from exception
195 1
        if evc.backup_path:
196
            try:
197
                evc.backup_path.is_valid(
198
                    evc.uni_a.interface.switch,
199
                    evc.uni_z.interface.switch,
200
                    bool(evc.circuit_scheduler),
201
                )
202
            except InvalidPath as exception:
203
                raise BadRequest(
204
                    f"backup_path is not valid: {exception}"
205
                ) from exception
206
207
        # verify duplicated evc
208 1
        if self._is_duplicated_evc(evc):
209 1
            result = "The EVC already exists."
210 1
            log.debug("create_circuit result %s %s", result, 409)
211 1
            raise Conflict(result)
212
213 1
        if (
214
            not evc.primary_path
215
            and evc.dynamic_backup_path is False
216
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
217
        ):
218 1
            result = "The EVC must have a primary path or allow dynamic paths."
219 1
            log.debug("create_circuit result %s %s", result, 400)
220 1
            raise BadRequest(result)
221
222
        # store circuit in dictionary
223 1
        self.circuits[evc.id] = evc
224
225
        # save circuit
226 1
        evc.sync()
227
228
        # Schedule the circuit deploy
229 1
        self.sched.add(evc)
230
231
        # Circuit has no schedule, deploy now
232 1
        if not evc.circuit_scheduler:
233 1
            with evc.lock:
234 1
                evc.deploy()
235
236
        # Notify users
237 1
        event = KytosEvent(
238
            name="kytos.mef_eline.created", content=evc.as_dict()
239
        )
240 1
        self.controller.buffers.app.put(event)
241
242 1
        result = {"circuit_id": evc.id}
243 1
        status = 201
244 1
        log.debug("create_circuit result %s %s", result, status)
245 1
        emit_event(self.controller, "created", evc_id=evc.id)
246 1
        return jsonify(result), status
247
248 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
249 1
    def update(self, circuit_id):
250
        """Update a circuit based on payload.
251
252
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
253
        """
254 1
        log.debug("update /v2/evc/%s", circuit_id)
255 1
        try:
256 1
            evc = self.circuits[circuit_id]
257 1
        except KeyError:
258 1
            result = f"circuit_id {circuit_id} not found"
259 1
            log.debug("update result %s %s", result, 404)
260 1
            raise NotFound(result) from NotFound
261
262 1
        if evc.archived:
263 1
            result = "Can't update archived EVC"
264 1
            log.debug("update result %s %s", result, 405)
265 1
            raise MethodNotAllowed(["GET"], result)
266
267 1
        try:
268 1
            data = request.get_json()
269 1
        except BadRequest:
270 1
            result = "The request body is not a well-formed JSON."
271 1
            log.debug("update result %s %s", result, 400)
272 1
            raise BadRequest(result) from BadRequest
273 1
        if data is None:
274 1
            result = "The request body mimetype is not application/json."
275 1
            log.debug("update result %s %s", result, 415)
276 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
277
278 1
        try:
279 1
            enable, redeploy = evc.update(
280
                **self._evc_dict_with_instances(data)
281
            )
282 1
        except ValueError as exception:
283 1
            log.error(exception)
284 1
            log.debug("update result %s %s", exception, 400)
285 1
            raise BadRequest(str(exception)) from BadRequest
286
287 1
        if evc.is_active():
288
            if enable is False:  # disable if active
289
                with evc.lock:
290
                    evc.remove()
291
            elif redeploy is not None:  # redeploy if active
292
                with evc.lock:
293
                    evc.remove()
294
                    evc.deploy()
295
        else:
296 1
            if enable is True:  # enable if inactive
297 1
                with evc.lock:
298 1
                    evc.deploy()
299 1
        result = {evc.id: evc.as_dict()}
300 1
        status = 200
301
302 1
        log.debug("update result %s %s", result, status)
303 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
304 1
        return jsonify(result), status
305
306 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
307 1
    def delete_circuit(self, circuit_id):
308
        """Remove a circuit.
309
310
        First, the flows are removed from the switches, and then the EVC is
311
        disabled.
312
        """
313 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
314 1
        try:
315 1
            evc = self.circuits[circuit_id]
316 1
        except KeyError:
317 1
            result = f"circuit_id {circuit_id} not found"
318 1
            log.debug("delete_circuit result %s %s", result, 404)
319 1
            raise NotFound(result) from NotFound
320
321 1
        if evc.archived:
322 1
            result = f"Circuit {circuit_id} already removed"
323 1
            log.debug("delete_circuit result %s %s", result, 404)
324 1
            raise NotFound(result) from NotFound
325
326 1
        log.info("Removing %s", evc)
327 1
        with evc.lock:
328 1
            evc.remove_current_flows()
329 1
            evc.deactivate()
330 1
            evc.disable()
331 1
            self.sched.remove(evc)
332 1
            evc.archive()
333 1
            evc.sync()
334 1
        log.info("EVC removed. %s", evc)
335 1
        result = {"response": f"Circuit {circuit_id} removed"}
336 1
        status = 200
337
338 1
        log.debug("delete_circuit result %s %s", result, status)
339 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
340 1
        return jsonify(result), status
341
342 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
343 1
    def get_metadata(self, circuit_id):
344
        """Get metadata from an EVC."""
345 1
        try:
346 1
            return (
347
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
348
                200,
349
            )
350
        except KeyError as error:
351
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
352
353 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
354 1
    def add_metadata(self, circuit_id):
355
        """Add metadata to an EVC."""
356 1
        try:
357 1
            metadata = request.get_json()
358 1
            content_type = request.content_type
359 1
        except BadRequest as error:
360 1
            result = "The request body is not a well-formed JSON."
361 1
            raise BadRequest(result) from error
362 1
        if content_type is None:
363 1
            result = "The request body is empty."
364 1
            raise BadRequest(result)
365 1
        if metadata is None:
366 1
            if content_type != "application/json":
367 1
                result = (
368
                    "The content type must be application/json "
369
                    f"(received {content_type})."
370
                )
371
            else:
372
                result = "Metadata is empty."
373 1
            raise UnsupportedMediaType(result)
374
375 1
        try:
376 1
            evc = self.circuits[circuit_id]
377 1
        except KeyError as error:
378 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
379
380 1
        evc.extend_metadata(metadata)
381 1
        evc.sync()
382 1
        return jsonify("Operation successful"), 201
383
384 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
385 1
    def delete_metadata(self, circuit_id, key):
386
        """Delete metadata from an EVC."""
387 1
        try:
388 1
            evc = self.circuits[circuit_id]
389 1
        except KeyError as error:
390 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
391
392 1
        evc.remove_metadata(key)
393 1
        evc.sync()
394 1
        return jsonify("Operation successful"), 200
395
396 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
397 1
    def redeploy(self, circuit_id):
398
        """Endpoint to force the redeployment of an EVC."""
399 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
400 1
        try:
401 1
            evc = self.circuits[circuit_id]
402 1
        except KeyError:
403 1
            result = f"circuit_id {circuit_id} not found"
404 1
            raise NotFound(result) from NotFound
405 1
        if evc.is_enabled():
406 1
            with evc.lock:
407 1
                evc.remove_current_flows()
408 1
                evc.deploy()
409 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
410 1
            status = 202
411
        else:
412 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
413 1
            status = 409
414
415 1
        return jsonify(result), status
416
417 1
    @rest("/v2/evc/schedule", methods=["GET"])
418 1
    def list_schedules(self):
419
        """Endpoint to return all schedules stored for all circuits.
420
421
        Return a JSON with the following template:
422
        [{"schedule_id": <schedule_id>,
423
         "circuit_id": <circuit_id>,
424
         "schedule": <schedule object>}]
425
        """
426 1
        log.debug("list_schedules /v2/evc/schedule")
427 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
428 1
        if not circuits:
429 1
            result = {}
430 1
            status = 200
431 1
            return jsonify(result), status
432
433 1
        result = []
434 1
        status = 200
435 1
        for circuit in circuits:
436 1
            circuit_scheduler = circuit.get("circuit_scheduler")
437 1
            if circuit_scheduler:
438 1
                for scheduler in circuit_scheduler:
439 1
                    value = {
440
                        "schedule_id": scheduler.get("id"),
441
                        "circuit_id": circuit.get("id"),
442
                        "schedule": scheduler,
443
                    }
444 1
                    result.append(value)
445
446 1
        log.debug("list_schedules result %s %s", result, status)
447 1
        return jsonify(result), status
448
449 1
    @rest("/v2/evc/schedule/", methods=["POST"])
450 1
    def create_schedule(self):
451
        """
452
        Create a new schedule for a given circuit.
453
454
        This service do no check if there are conflicts with another schedule.
455
        Payload example:
456
            {
457
              "circuit_id":"aa:bb:cc",
458
              "schedule": {
459
                "date": "2019-08-07T14:52:10.967Z",
460
                "interval": "string",
461
                "frequency": "1 * * * *",
462
                "action": "create"
463
              }
464
            }
465
        """
466 1
        log.debug("create_schedule /v2/evc/schedule/")
467
468 1
        json_data = self._json_from_request("create_schedule")
469 1
        try:
470 1
            circuit_id = json_data["circuit_id"]
471 1
        except TypeError:
472 1
            result = "The payload should have a dictionary."
473 1
            log.debug("create_schedule result %s %s", result, 400)
474 1
            raise BadRequest(result) from BadRequest
475 1
        except KeyError:
476 1
            result = "Missing circuit_id."
477 1
            log.debug("create_schedule result %s %s", result, 400)
478 1
            raise BadRequest(result) from BadRequest
479
480 1
        try:
481 1
            schedule_data = json_data["schedule"]
482 1
        except KeyError:
483 1
            result = "Missing schedule data."
484 1
            log.debug("create_schedule result %s %s", result, 400)
485 1
            raise BadRequest(result) from BadRequest
486
487
        # Get EVC from circuits buffer
488 1
        circuits = self._get_circuits_buffer()
489
490
        # get the circuit
491 1
        evc = circuits.get(circuit_id)
492
493
        # get the circuit
494 1
        if not evc:
495 1
            result = f"circuit_id {circuit_id} not found"
496 1
            log.debug("create_schedule result %s %s", result, 404)
497 1
            raise NotFound(result) from NotFound
498
        # Can not modify circuits deleted and archived
499 1
        if evc.archived:
500 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
501 1
            log.debug("create_schedule result %s %s", result, 403)
502 1
            raise Forbidden(result) from Forbidden
503
504
        # new schedule from dict
505 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
506
507
        # If there is no schedule, create the list
508 1
        if not evc.circuit_scheduler:
509 1
            evc.circuit_scheduler = []
510
511
        # Add the new schedule
512 1
        evc.circuit_scheduler.append(new_schedule)
513
514
        # Add schedule job
515 1
        self.sched.add_circuit_job(evc, new_schedule)
516
517
        # save circuit to mongodb
518 1
        evc.sync()
519
520 1
        result = new_schedule.as_dict()
521 1
        status = 201
522
523 1
        log.debug("create_schedule result %s %s", result, status)
524 1
        return jsonify(result), status
525
526 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
527 1
    def update_schedule(self, schedule_id):
528
        """Update a schedule.
529
530
        Change all attributes from the given schedule from a EVC circuit.
531
        The schedule ID is preserved as default.
532
        Payload example:
533
            {
534
              "date": "2019-08-07T14:52:10.967Z",
535
              "interval": "string",
536
              "frequency": "1 * * *",
537
              "action": "create"
538
            }
539
        """
540 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
541
542
        # Try to find a circuit schedule
543 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
544
545
        # Can not modify circuits deleted and archived
546 1
        if not found_schedule:
547 1
            result = f"schedule_id {schedule_id} not found"
548 1
            log.debug("update_schedule result %s %s", result, 404)
549 1
            raise NotFound(result) from NotFound
550 1
        if evc.archived:
551 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
552 1
            log.debug("update_schedule result %s %s", result, 403)
553 1
            raise Forbidden(result) from Forbidden
554
555 1
        data = self._json_from_request("update_schedule")
556
557 1
        new_schedule = CircuitSchedule.from_dict(data)
558 1
        new_schedule.id = found_schedule.id
559
        # Remove the old schedule
560 1
        evc.circuit_scheduler.remove(found_schedule)
561
        # Append the modified schedule
562 1
        evc.circuit_scheduler.append(new_schedule)
563
564
        # Cancel all schedule jobs
565 1
        self.sched.cancel_job(found_schedule.id)
566
        # Add the new circuit schedule
567 1
        self.sched.add_circuit_job(evc, new_schedule)
568
        # Save EVC to mongodb
569 1
        evc.sync()
570
571 1
        result = new_schedule.as_dict()
572 1
        status = 200
573
574 1
        log.debug("update_schedule result %s %s", result, status)
575 1
        return jsonify(result), status
576
577 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
578 1
    def delete_schedule(self, schedule_id):
579
        """Remove a circuit schedule.
580
581
        Remove the Schedule from EVC.
582
        Remove the Schedule from cron job.
583
        Save the EVC to the Storehouse.
584
        """
585 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
586 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
587
588
        # Can not modify circuits deleted and archived
589 1
        if not found_schedule:
590 1
            result = f"schedule_id {schedule_id} not found"
591 1
            log.debug("delete_schedule result %s %s", result, 404)
592 1
            raise NotFound(result)
593
594 1
        if evc.archived:
595 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
596 1
            log.debug("delete_schedule result %s %s", result, 403)
597 1
            raise Forbidden(result)
598
599
        # Remove the old schedule
600 1
        evc.circuit_scheduler.remove(found_schedule)
601
602
        # Cancel all schedule jobs
603 1
        self.sched.cancel_job(found_schedule.id)
604
        # Save EVC to mongodb
605 1
        evc.sync()
606
607 1
        result = "Schedule removed"
608 1
        status = 200
609
610 1
        log.debug("delete_schedule result %s %s", result, status)
611 1
        return jsonify(result), status
612
613 1
    def _is_duplicated_evc(self, evc):
614
        """Verify if the circuit given is duplicated with the stored evcs.
615
616
        Args:
617
            evc (EVC): circuit to be analysed.
618
619
        Returns:
620
            boolean: True if the circuit is duplicated, otherwise False.
621
622
        """
623 1
        for circuit in tuple(self.circuits.values()):
624 1
            if not circuit.archived and circuit.shares_uni(evc):
625 1
                return True
626 1
        return False
627
628 1
    @listen_to("kytos/topology.link_up")
629 1
    def on_link_up(self, event):
630
        """Change circuit when link is up or end_maintenance."""
631
        self.handle_link_up(event)
632
633 1
    def handle_link_up(self, event):
634
        """Change circuit when link is up or end_maintenance."""
635 1
        log.debug("Event handle_link_up %s", event)
636 1
        for evc in self.circuits.copy().values():
637 1
            if evc.is_enabled() and not evc.archived:
638 1
                with evc.lock:
639 1
                    evc.handle_link_up(event.content["link"])
640
641 1
    @listen_to("kytos/topology.link_down")
642 1
    def on_link_down(self, event):
643
        """Change circuit when link is down or under_mantenance."""
644
        self.handle_link_down(event)
645
646 1
    def handle_link_down(self, event):
647
        """Change circuit when link is down or under_mantenance."""
648 1
        link = event.content["link"]
649 1
        log.info("Event handle_link_down %s", link)
650 1
        switch_flows = {}
651 1
        evcs_with_failover = []
652 1
        evcs_normal = []
653 1
        check_failover = []
654 1
        for evc in self.circuits.copy().values():
655 1
            if evc.is_affected_by_link(link):
656
                # if there is no failover path, handles link down the
657
                # tradditional way
658 1
                if (
659
                    not getattr(evc, 'failover_path', None) or
660
                    evc.is_failover_path_affected_by_link(link)
661
                ):
662 1
                    evcs_normal.append(evc)
663 1
                    continue
664 1
                for dpid, flows in evc.get_failover_flows().items():
665 1
                    switch_flows.setdefault(dpid, [])
666 1
                    switch_flows[dpid].extend(flows)
667 1
                evcs_with_failover.append(evc)
668
            else:
669 1
                check_failover.append(evc)
670
671 1
        offset = 0
672 1
        while switch_flows:
673 1
            offset = (offset + settings.BATCH_SIZE) or None
674 1
            switches = list(switch_flows.keys())
675 1
            for dpid in switches:
676 1
                emit_event(
677
                    self.controller,
678
                    context="kytos.flow_manager",
679
                    name="flows.install",
680
                    dpid=dpid,
681
                    flow_dict={"flows": switch_flows[dpid][:offset]},
682
                )
683 1
                if offset is None or offset >= len(switch_flows[dpid]):
684 1
                    del switch_flows[dpid]
685 1
                    continue
686 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
687 1
            time.sleep(settings.BATCH_INTERVAL)
688
689 1
        for evc in evcs_with_failover:
690 1
            with evc.lock:
691 1
                old_path = evc.current_path
692 1
                evc.current_path = evc.failover_path
693 1
                evc.failover_path = old_path
694 1
                evc.sync()
695 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
696 1
            log.info(
697
                f"{evc} redeployed with failover due to link down {link.id}"
698
            )
699
700 1
        for evc in evcs_normal:
701 1
            emit_event(
702
                self.controller,
703
                "evc_affected_by_link_down",
704
                evc_id=evc.id,
705
                link_id=link.id,
706
            )
707
708
        # After handling the hot path, check if new failover paths are needed.
709
        # Note that EVCs affected by link down will generate a KytosEvent for
710
        # deployed|redeployed, which will trigger the failover path setup.
711
        # Thus, we just need to further check the check_failover list
712 1
        for evc in check_failover:
713 1
            if evc.is_failover_path_affected_by_link(link):
714 1
                evc.setup_failover_path()
715
716 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
717 1
    def on_evc_affected_by_link_down(self, event):
718
        """Change circuit when link is down or under_mantenance."""
719
        self.handle_evc_affected_by_link_down(event)
720
721 1
    def handle_evc_affected_by_link_down(self, event):
722
        """Change circuit when link is down or under_mantenance."""
723 1
        evc = self.circuits.get(event.content["evc_id"])
724 1
        link_id = event.content['link_id']
725 1
        if not evc:
726 1
            return
727 1
        with evc.lock:
728 1
            result = evc.handle_link_down()
729 1
        event_name = "error_redeploy_link_down"
730 1
        if result:
731 1
            log.info(f"{evc} redeployed due to link down {link_id}")
732 1
            event_name = "redeployed_link_down"
733 1
        emit_event(self.controller, event_name, evc_id=evc.id)
734
735 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
736 1
    def on_evc_deployed(self, event):
737
        """Handle EVC deployed|redeployed_link_down."""
738
        self.handle_evc_deployed(event)
739
740 1
    def handle_evc_deployed(self, event):
741
        """Setup failover path on evc deployed."""
742 1
        evc = self.circuits.get(event.content["evc_id"])
743 1
        if not evc:
744 1
            return
745 1
        with evc.lock:
746 1
            evc.setup_failover_path()
747
748 1
    @listen_to("kytos/topology.topology_loaded")
749 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
750
        """Load EVCs once the topology is available."""
751
        self.load_all_evcs()
752
753 1
    def load_all_evcs(self):
754
        """Try to load all EVCs on startup."""
755 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
756 1
        for circuit_id, circuit in circuits:
757 1
            if circuit_id not in self.circuits:
758 1
                self._load_evc(circuit)
759
760 1
    def _load_evc(self, circuit_dict):
761
        """Load one EVC from mongodb to memory."""
762 1
        try:
763 1
            evc = self._evc_from_dict(circuit_dict)
764 1
        except ValueError as exception:
765 1
            log.error(
766
                f"Could not load EVC: dict={circuit_dict} error={exception}"
767
            )
768 1
            return None
769
770 1
        if evc.archived:
771 1
            return None
772 1
        evc.deactivate()
773 1
        evc.sync()
774 1
        self.circuits.setdefault(evc.id, evc)
775 1
        self.sched.add(evc)
776 1
        return evc
777
778 1
    @listen_to("kytos/flow_manager.flow.error")
779 1
    def on_flow_mod_error(self, event):
780
        """Handle flow mod errors related to an EVC."""
781
        self.handle_flow_mod_error(event)
782
783 1
    def handle_flow_mod_error(self, event):
784
        """Handle flow mod errors related to an EVC."""
785 1
        flow = event.content["flow"]
786 1
        command = event.content.get("error_command")
787 1
        if command != "add":
788
            return
789 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
790 1
        if evc:
791 1
            evc.remove_current_flows()
792
793 1
    def _evc_dict_with_instances(self, evc_dict):
794
        """Convert some dict values to instance of EVC classes.
795
796
        This method will convert: [UNI, Link]
797
        """
798 1
        data = evc_dict.copy()  # Do not modify the original dict
799 1
        for attribute, value in data.items():
800
            # Get multiple attributes.
801
            # Ex: uni_a, uni_z
802 1
            if "uni" in attribute:
803 1
                try:
804 1
                    data[attribute] = self._uni_from_dict(value)
805 1
                except ValueError as exception:
806 1
                    result = "Error creating UNI: Invalid value"
807 1
                    raise ValueError(result) from exception
808
809 1
            if attribute == "circuit_scheduler":
810 1
                data[attribute] = []
811 1
                for schedule in value:
812 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
813
814
            # Get multiple attributes.
815
            # Ex: primary_links,
816
            #     backup_links,
817
            #     current_links_cache,
818
            #     primary_links_cache,
819
            #     backup_links_cache
820 1
            if "links" in attribute:
821 1
                data[attribute] = [
822
                    self._link_from_dict(link) for link in value
823
                ]
824
825
            # Ex: current_path,
826
            #     primary_path,
827
            #     backup_path
828 1
            if "path" in attribute and attribute != "dynamic_backup_path":
829 1
                data[attribute] = Path(
830
                    [self._link_from_dict(link) for link in value]
831
                )
832
833 1
        return data
834
835 1
    def _evc_from_dict(self, evc_dict):
836 1
        data = self._evc_dict_with_instances(evc_dict)
837 1
        return EVC(self.controller, **data)
838
839 1
    def _uni_from_dict(self, uni_dict):
840
        """Return a UNI object from python dict."""
841 1
        if uni_dict is None:
842 1
            return False
843
844 1
        interface_id = uni_dict.get("interface_id")
845 1
        interface = self.controller.get_interface_by_id(interface_id)
846 1
        if interface is None:
847 1
            result = (
848
                "Error creating UNI:"
849
                + f"Could not instantiate interface {interface_id}"
850
            )
851 1
            raise ValueError(result) from ValueError
852
853 1
        tag_dict = uni_dict.get("tag", None)
854 1
        if tag_dict:
855 1
            tag = TAG.from_dict(tag_dict)
856
        else:
857 1
            tag = None
858 1
        uni = UNI(interface, tag)
859
860 1
        return uni
861
862 1
    def _link_from_dict(self, link_dict):
863
        """Return a Link object from python dict."""
864 1
        id_a = link_dict.get("endpoint_a").get("id")
865 1
        id_b = link_dict.get("endpoint_b").get("id")
866
867 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
868 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
869
870 1
        link = Link(endpoint_a, endpoint_b)
871 1
        if "metadata" in link_dict:
872
            link.extend_metadata(link_dict.get("metadata"))
873
874 1
        s_vlan = link.get_metadata("s_vlan")
875 1
        if s_vlan:
876
            tag = TAG.from_dict(s_vlan)
877
            if tag is False:
878
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
879
                raise ValueError(error_msg)
880
            link.update_metadata("s_vlan", tag)
881 1
        return link
882
883 1
    def _find_evc_by_schedule_id(self, schedule_id):
884
        """
885
        Find an EVC and CircuitSchedule based on schedule_id.
886
887
        :param schedule_id: Schedule ID
888
        :return: EVC and Schedule
889
        """
890 1
        circuits = self._get_circuits_buffer()
891 1
        found_schedule = None
892 1
        evc = None
893
894
        # pylint: disable=unused-variable
895 1
        for c_id, circuit in circuits.items():
896 1
            for schedule in circuit.circuit_scheduler:
897 1
                if schedule.id == schedule_id:
898 1
                    found_schedule = schedule
899 1
                    evc = circuit
900 1
                    break
901 1
            if found_schedule:
902 1
                break
903 1
        return evc, found_schedule
904
905 1
    def _get_circuits_buffer(self):
906
        """
907
        Return the circuit buffer.
908
909
        If the buffer is empty, try to load data from mongodb.
910
        """
911 1
        if not self.circuits:
912
            # Load circuits from mongodb to buffer
913 1
            circuits = self.mongo_controller.get_circuits()['circuits']
914 1
            for c_id, circuit in circuits.items():
915 1
                evc = self._evc_from_dict(circuit)
916 1
                self.circuits[c_id] = evc
917 1
        return self.circuits
918
919 1
    @staticmethod
920 1
    def _json_from_request(caller):
921
        """Return a json from request.
922
923
        If it was not possible to get a json from the request, log, for debug,
924
        who was the caller and the error that ocurred, and raise an
925
        Exception.
926
        """
927 1
        try:
928 1
            json_data = request.get_json()
929
        except ValueError as exception:
930
            log.error(exception)
931
            log.debug(f"{caller} result {exception} 400")
932
            raise BadRequest(str(exception)) from BadRequest
933
        except BadRequest:
934
            result = "The request is not a valid JSON."
935
            log.debug(f"{caller} result {result} 400")
936
            raise BadRequest(result) from BadRequest
937 1
        if json_data is None:
938 1
            result = "Content-Type must be application/json"
939 1
            log.debug(f"{caller} result {result} 415")
940 1
            raise UnsupportedMediaType(result)
941
        return json_data
942