Passed
Pull Request — master (#175)
by Italo Valcy
03:26
created

build.main.Main.handle_link_down()   F

Complexity

Conditions 15

Size

Total Lines 69
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 41
CRAP Score 15

Importance

Changes 0
Metric Value
eloc 53
dl 0
loc 69
ccs 41
cts 41
cp 1
rs 2.9998
c 0
b 0
f 0
cc 15
nop 2
crap 15

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.handle_link_down() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    @staticmethod
64 1
    def get_eline_controller():
65
        """Return the ELineController instance."""
66
        return controllers.ELineController()
67
68 1
    def execute(self):
69
        """Execute once when the napp is running."""
70 1
        if self.execution_rounds < 0:
71
            self.execution_rounds += 1
72
            return
73 1
        if self._lock.locked():
74 1
            return
75 1
        log.debug("Starting consistency routine")
76 1
        with self._lock:
77 1
            self.execute_consistency()
78 1
        log.debug("Finished consistency routine")
79
80 1
    def execute_consistency(self):
81
        """Execute consistency routine."""
82 1
        self.execution_rounds += 1
83 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
84 1
        for circuit in tuple(self.circuits.values()):
85 1
            stored_circuits.pop(circuit.id, None)
86 1
            if (
87
                circuit.is_enabled()
88
                and not circuit.is_active()
89
                and not circuit.lock.locked()
90
            ):
91 1
                if circuit.check_traces():
92 1
                    log.info(f"{circuit} enabled but inactive - activating")
93 1
                    with circuit.lock:
94 1
                        circuit.activate()
95 1
                        circuit.sync()
96
                else:
97 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
98 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
99 1
                        with circuit.lock:
100 1
                            circuit.deploy()
101 1
        for circuit_id in stored_circuits:
102 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
103 1
            self._load_evc(stored_circuits[circuit_id])
104
105 1
    def shutdown(self):
106
        """Execute when your napp is unloaded.
107
108
        If you have some cleanup procedure, insert it here.
109
        """
110
111 1
    @rest("/v2/evc/", methods=["GET"])
112 1
    def list_circuits(self):
113
        """Endpoint to return circuits stored.
114
115
        If archived is set to True return all circuits, else only the ones
116
        not archived.
117
        """
118 1
        log.debug("list_circuits /v2/evc")
119 1
        archived = request.args.get("archived", False)
120 1
        circuits = self.mongo_controller.get_circuits()['circuits']
121 1
        if not circuits:
122 1
            return jsonify({}), 200
123 1
        if archived:
124 1
            return jsonify(circuits), 200
125 1
        return (
126
            jsonify(
127
                {
128
                    circuit_id: circuit
129
                    for circuit_id, circuit in circuits.items()
130
                    if not circuit.get("archived", False)
131
                }
132
            ),
133
            200,
134
        )
135
136 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
137 1
    def get_circuit(self, circuit_id):
138
        """Endpoint to return a circuit based on id."""
139 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
140 1
        circuits = self.mongo_controller.get_circuits()['circuits']
141
142 1
        try:
143 1
            result = circuits[circuit_id]
144 1
        except KeyError:
145 1
            result = f"circuit_id {circuit_id} not found"
146 1
            log.debug("get_circuit result %s %s", result, 404)
147 1
            raise NotFound(result) from KeyError
148 1
        status = 200
149 1
        log.debug("get_circuit result %s %s", result, status)
150 1
        return jsonify(result), status
151
152 1
    @rest("/v2/evc/", methods=["POST"])
153 1
    @validate(spec)
154 1
    def create_circuit(self, data):
155
        """Try to create a new circuit.
156
157
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
158
        UNI_Z's requested C-VID are available from the interfaces' pools. This
159
        is checked when creating the UNI object.
160
161
        Then, E-Line NApp requests a primary and a backup path to the
162
        Pathfinder NApp using the attributes primary_links and backup_links
163
        submitted via REST
164
165
        # For each link composing paths in #3:
166
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
167
        #  - Using the S-VID obtained, generate abstract flow entries to be
168
        #    sent to FlowManager
169
170
        Push abstract flow entries to FlowManager and FlowManager pushes
171
        OpenFlow entries to datapaths
172
173
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
174
        creation
175
176
        Finnaly, notify user of the status of its request.
177
        """
178
        # Try to create the circuit object
179 1
        log.debug("create_circuit /v2/evc/")
180
181 1
        try:
182 1
            evc = self._evc_from_dict(data)
183 1
        except ValueError as exception:
184 1
            log.debug("create_circuit result %s %s", exception, 400)
185 1
            raise BadRequest(str(exception)) from BadRequest
186
187 1
        if evc.primary_path:
188
            try:
189
                evc.primary_path.is_valid(
190
                    evc.uni_a.interface.switch,
191
                    evc.uni_z.interface.switch,
192
                    bool(evc.circuit_scheduler),
193
                )
194
            except InvalidPath as exception:
195
                raise BadRequest(
196
                    f"primary_path is not valid: {exception}"
197
                ) from exception
198 1
        if evc.backup_path:
199
            try:
200
                evc.backup_path.is_valid(
201
                    evc.uni_a.interface.switch,
202
                    evc.uni_z.interface.switch,
203
                    bool(evc.circuit_scheduler),
204
                )
205
            except InvalidPath as exception:
206
                raise BadRequest(
207
                    f"backup_path is not valid: {exception}"
208
                ) from exception
209
210
        # verify duplicated evc
211 1
        if self._is_duplicated_evc(evc):
212 1
            result = "The EVC already exists."
213 1
            log.debug("create_circuit result %s %s", result, 409)
214 1
            raise Conflict(result)
215
216 1
        if (
217
            not evc.primary_path
218
            and evc.dynamic_backup_path is False
219
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
220
        ):
221 1
            result = "The EVC must have a primary path or allow dynamic paths."
222 1
            log.debug("create_circuit result %s %s", result, 400)
223 1
            raise BadRequest(result)
224
225
        # store circuit in dictionary
226 1
        self.circuits[evc.id] = evc
227
228
        # save circuit
229 1
        evc.sync()
230
231
        # Schedule the circuit deploy
232 1
        self.sched.add(evc)
233
234
        # Circuit has no schedule, deploy now
235 1
        if not evc.circuit_scheduler:
236 1
            with evc.lock:
237 1
                evc.deploy()
238
239
        # Notify users
240 1
        event = KytosEvent(
241
            name="kytos.mef_eline.created", content=evc.as_dict()
242
        )
243 1
        self.controller.buffers.app.put(event)
244
245 1
        result = {"circuit_id": evc.id}
246 1
        status = 201
247 1
        log.debug("create_circuit result %s %s", result, status)
248 1
        emit_event(self.controller, "created", evc_id=evc.id)
249 1
        return jsonify(result), status
250
251 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
252 1
    def update(self, circuit_id):
253
        """Update a circuit based on payload.
254
255
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
256
        """
257 1
        log.debug("update /v2/evc/%s", circuit_id)
258 1
        try:
259 1
            evc = self.circuits[circuit_id]
260 1
        except KeyError:
261 1
            result = f"circuit_id {circuit_id} not found"
262 1
            log.debug("update result %s %s", result, 404)
263 1
            raise NotFound(result) from NotFound
264
265 1
        if evc.archived:
266 1
            result = "Can't update archived EVC"
267 1
            log.debug("update result %s %s", result, 405)
268 1
            raise MethodNotAllowed(["GET"], result)
269
270 1
        try:
271 1
            data = request.get_json()
272 1
        except BadRequest:
273 1
            result = "The request body is not a well-formed JSON."
274 1
            log.debug("update result %s %s", result, 400)
275 1
            raise BadRequest(result) from BadRequest
276 1
        if data is None:
277 1
            result = "The request body mimetype is not application/json."
278 1
            log.debug("update result %s %s", result, 415)
279 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
280
281 1
        try:
282 1
            enable, redeploy = evc.update(
283
                **self._evc_dict_with_instances(data)
284
            )
285 1
        except ValueError as exception:
286 1
            log.error(exception)
287 1
            log.debug("update result %s %s", exception, 400)
288 1
            raise BadRequest(str(exception)) from BadRequest
289
290 1
        if evc.is_active():
291
            if enable is False:  # disable if active
292
                with evc.lock:
293
                    evc.remove()
294
            elif redeploy is not None:  # redeploy if active
295
                with evc.lock:
296
                    evc.remove()
297
                    evc.deploy()
298
        else:
299 1
            if enable is True:  # enable if inactive
300 1
                with evc.lock:
301 1
                    evc.deploy()
302 1
        result = {evc.id: evc.as_dict()}
303 1
        status = 200
304
305 1
        log.debug("update result %s %s", result, status)
306 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
307 1
        return jsonify(result), status
308
309 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
310 1
    def delete_circuit(self, circuit_id):
311
        """Remove a circuit.
312
313
        First, the flows are removed from the switches, and then the EVC is
314
        disabled.
315
        """
316 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
317 1
        try:
318 1
            evc = self.circuits[circuit_id]
319 1
        except KeyError:
320 1
            result = f"circuit_id {circuit_id} not found"
321 1
            log.debug("delete_circuit result %s %s", result, 404)
322 1
            raise NotFound(result) from NotFound
323
324 1
        if evc.archived:
325 1
            result = f"Circuit {circuit_id} already removed"
326 1
            log.debug("delete_circuit result %s %s", result, 404)
327 1
            raise NotFound(result) from NotFound
328
329 1
        log.info("Removing %s", evc)
330 1
        with evc.lock:
331 1
            evc.remove_current_flows()
332 1
            evc.deactivate()
333 1
            evc.disable()
334 1
            self.sched.remove(evc)
335 1
            evc.archive()
336 1
            evc.sync()
337 1
        log.info("EVC removed. %s", evc)
338 1
        result = {"response": f"Circuit {circuit_id} removed"}
339 1
        status = 200
340
341 1
        log.debug("delete_circuit result %s %s", result, status)
342 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
343 1
        return jsonify(result), status
344
345 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
346 1
    def get_metadata(self, circuit_id):
347
        """Get metadata from an EVC."""
348 1
        try:
349 1
            return (
350
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
351
                200,
352
            )
353
        except KeyError as error:
354
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
355
356 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
357 1
    def add_metadata(self, circuit_id):
358
        """Add metadata to an EVC."""
359 1
        try:
360 1
            metadata = request.get_json()
361 1
            content_type = request.content_type
362 1
        except BadRequest as error:
363 1
            result = "The request body is not a well-formed JSON."
364 1
            raise BadRequest(result) from error
365 1
        if content_type is None:
366 1
            result = "The request body is empty."
367 1
            raise BadRequest(result)
368 1
        if metadata is None:
369 1
            if content_type != "application/json":
370 1
                result = (
371
                    "The content type must be application/json "
372
                    f"(received {content_type})."
373
                )
374
            else:
375
                result = "Metadata is empty."
376 1
            raise UnsupportedMediaType(result)
377
378 1
        try:
379 1
            evc = self.circuits[circuit_id]
380 1
        except KeyError as error:
381 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
382
383 1
        evc.extend_metadata(metadata)
384 1
        evc.sync()
385 1
        return jsonify("Operation successful"), 201
386
387 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
388 1
    def delete_metadata(self, circuit_id, key):
389
        """Delete metadata from an EVC."""
390 1
        try:
391 1
            evc = self.circuits[circuit_id]
392 1
        except KeyError as error:
393 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
394
395 1
        evc.remove_metadata(key)
396 1
        evc.sync()
397 1
        return jsonify("Operation successful"), 200
398
399 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
400 1
    def redeploy(self, circuit_id):
401
        """Endpoint to force the redeployment of an EVC."""
402 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
403 1
        try:
404 1
            evc = self.circuits[circuit_id]
405 1
        except KeyError:
406 1
            result = f"circuit_id {circuit_id} not found"
407 1
            raise NotFound(result) from NotFound
408 1
        if evc.is_enabled():
409 1
            with evc.lock:
410 1
                evc.remove_current_flows()
411 1
                evc.deploy()
412 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
413 1
            status = 202
414
        else:
415 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
416 1
            status = 409
417
418 1
        return jsonify(result), status
419
420 1
    @rest("/v2/evc/schedule", methods=["GET"])
421 1
    def list_schedules(self):
422
        """Endpoint to return all schedules stored for all circuits.
423
424
        Return a JSON with the following template:
425
        [{"schedule_id": <schedule_id>,
426
         "circuit_id": <circuit_id>,
427
         "schedule": <schedule object>}]
428
        """
429 1
        log.debug("list_schedules /v2/evc/schedule")
430 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
431 1
        if not circuits:
432 1
            result = {}
433 1
            status = 200
434 1
            return jsonify(result), status
435
436 1
        result = []
437 1
        status = 200
438 1
        for circuit in circuits:
439 1
            circuit_scheduler = circuit.get("circuit_scheduler")
440 1
            if circuit_scheduler:
441 1
                for scheduler in circuit_scheduler:
442 1
                    value = {
443
                        "schedule_id": scheduler.get("id"),
444
                        "circuit_id": circuit.get("id"),
445
                        "schedule": scheduler,
446
                    }
447 1
                    result.append(value)
448
449 1
        log.debug("list_schedules result %s %s", result, status)
450 1
        return jsonify(result), status
451
452 1
    @rest("/v2/evc/schedule/", methods=["POST"])
453 1
    def create_schedule(self):
454
        """
455
        Create a new schedule for a given circuit.
456
457
        This service do no check if there are conflicts with another schedule.
458
        Payload example:
459
            {
460
              "circuit_id":"aa:bb:cc",
461
              "schedule": {
462
                "date": "2019-08-07T14:52:10.967Z",
463
                "interval": "string",
464
                "frequency": "1 * * * *",
465
                "action": "create"
466
              }
467
            }
468
        """
469 1
        log.debug("create_schedule /v2/evc/schedule/")
470
471 1
        json_data = self._json_from_request("create_schedule")
472 1
        try:
473 1
            circuit_id = json_data["circuit_id"]
474 1
        except TypeError:
475 1
            result = "The payload should have a dictionary."
476 1
            log.debug("create_schedule result %s %s", result, 400)
477 1
            raise BadRequest(result) from BadRequest
478 1
        except KeyError:
479 1
            result = "Missing circuit_id."
480 1
            log.debug("create_schedule result %s %s", result, 400)
481 1
            raise BadRequest(result) from BadRequest
482
483 1
        try:
484 1
            schedule_data = json_data["schedule"]
485 1
        except KeyError:
486 1
            result = "Missing schedule data."
487 1
            log.debug("create_schedule result %s %s", result, 400)
488 1
            raise BadRequest(result) from BadRequest
489
490
        # Get EVC from circuits buffer
491 1
        circuits = self._get_circuits_buffer()
492
493
        # get the circuit
494 1
        evc = circuits.get(circuit_id)
495
496
        # get the circuit
497 1
        if not evc:
498 1
            result = f"circuit_id {circuit_id} not found"
499 1
            log.debug("create_schedule result %s %s", result, 404)
500 1
            raise NotFound(result) from NotFound
501
        # Can not modify circuits deleted and archived
502 1
        if evc.archived:
503 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
504 1
            log.debug("create_schedule result %s %s", result, 403)
505 1
            raise Forbidden(result) from Forbidden
506
507
        # new schedule from dict
508 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
509
510
        # If there is no schedule, create the list
511 1
        if not evc.circuit_scheduler:
512 1
            evc.circuit_scheduler = []
513
514
        # Add the new schedule
515 1
        evc.circuit_scheduler.append(new_schedule)
516
517
        # Add schedule job
518 1
        self.sched.add_circuit_job(evc, new_schedule)
519
520
        # save circuit to mongodb
521 1
        evc.sync()
522
523 1
        result = new_schedule.as_dict()
524 1
        status = 201
525
526 1
        log.debug("create_schedule result %s %s", result, status)
527 1
        return jsonify(result), status
528
529 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
530 1
    def update_schedule(self, schedule_id):
531
        """Update a schedule.
532
533
        Change all attributes from the given schedule from a EVC circuit.
534
        The schedule ID is preserved as default.
535
        Payload example:
536
            {
537
              "date": "2019-08-07T14:52:10.967Z",
538
              "interval": "string",
539
              "frequency": "1 * * *",
540
              "action": "create"
541
            }
542
        """
543 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
544
545
        # Try to find a circuit schedule
546 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
547
548
        # Can not modify circuits deleted and archived
549 1
        if not found_schedule:
550 1
            result = f"schedule_id {schedule_id} not found"
551 1
            log.debug("update_schedule result %s %s", result, 404)
552 1
            raise NotFound(result) from NotFound
553 1
        if evc.archived:
554 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
555 1
            log.debug("update_schedule result %s %s", result, 403)
556 1
            raise Forbidden(result) from Forbidden
557
558 1
        data = self._json_from_request("update_schedule")
559
560 1
        new_schedule = CircuitSchedule.from_dict(data)
561 1
        new_schedule.id = found_schedule.id
562
        # Remove the old schedule
563 1
        evc.circuit_scheduler.remove(found_schedule)
564
        # Append the modified schedule
565 1
        evc.circuit_scheduler.append(new_schedule)
566
567
        # Cancel all schedule jobs
568 1
        self.sched.cancel_job(found_schedule.id)
569
        # Add the new circuit schedule
570 1
        self.sched.add_circuit_job(evc, new_schedule)
571
        # Save EVC to mongodb
572 1
        evc.sync()
573
574 1
        result = new_schedule.as_dict()
575 1
        status = 200
576
577 1
        log.debug("update_schedule result %s %s", result, status)
578 1
        return jsonify(result), status
579
580 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
581 1
    def delete_schedule(self, schedule_id):
582
        """Remove a circuit schedule.
583
584
        Remove the Schedule from EVC.
585
        Remove the Schedule from cron job.
586
        Save the EVC to the Storehouse.
587
        """
588 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
589 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
590
591
        # Can not modify circuits deleted and archived
592 1
        if not found_schedule:
593 1
            result = f"schedule_id {schedule_id} not found"
594 1
            log.debug("delete_schedule result %s %s", result, 404)
595 1
            raise NotFound(result)
596
597 1
        if evc.archived:
598 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
599 1
            log.debug("delete_schedule result %s %s", result, 403)
600 1
            raise Forbidden(result)
601
602
        # Remove the old schedule
603 1
        evc.circuit_scheduler.remove(found_schedule)
604
605
        # Cancel all schedule jobs
606 1
        self.sched.cancel_job(found_schedule.id)
607
        # Save EVC to mongodb
608 1
        evc.sync()
609
610 1
        result = "Schedule removed"
611 1
        status = 200
612
613 1
        log.debug("delete_schedule result %s %s", result, status)
614 1
        return jsonify(result), status
615
616 1
    def _is_duplicated_evc(self, evc):
617
        """Verify if the circuit given is duplicated with the stored evcs.
618
619
        Args:
620
            evc (EVC): circuit to be analysed.
621
622
        Returns:
623
            boolean: True if the circuit is duplicated, otherwise False.
624
625
        """
626 1
        for circuit in tuple(self.circuits.values()):
627 1
            if not circuit.archived and circuit.shares_uni(evc):
628 1
                return True
629 1
        return False
630
631 1
    @listen_to("kytos/topology.link_up")
632 1
    def on_link_up(self, event):
633
        """Change circuit when link is up or end_maintenance."""
634
        self.handle_link_up(event)
635
636 1
    def handle_link_up(self, event):
637
        """Change circuit when link is up or end_maintenance."""
638 1
        log.debug("Event handle_link_up %s", event)
639 1
        for evc in self.circuits.values():
640 1
            if evc.is_enabled() and not evc.archived:
641 1
                with evc.lock:
642 1
                    evc.handle_link_up(event.content["link"])
643
644 1
    @listen_to("kytos/topology.link_down")
645 1
    def on_link_down(self, event):
646
        """Change circuit when link is down or under_mantenance."""
647
        self.handle_link_down(event)
648
649 1
    def handle_link_down(self, event):
650
        """Change circuit when link is down or under_mantenance."""
651 1
        link = event.content["link"]
652 1
        log.info("Event handle_link_down %s", link)
653 1
        switch_flows = {}
654 1
        evcs_with_failover = []
655 1
        evcs_normal = []
656 1
        check_failover = []
657 1
        for evc in self.circuits.values():
658 1
            if evc.is_affected_by_link(link):
659
                # if there is no failover path, handles link down the
660
                # tradditional way
661 1
                if (
662
                    not getattr(evc, 'failover_path', None) or
663
                    evc.is_failover_path_affected_by_link(link)
664
                ):
665 1
                    evcs_normal.append(evc)
666 1
                    continue
667 1
                for dpid, flows in evc.get_failover_flows().items():
668 1
                    switch_flows.setdefault(dpid, [])
669 1
                    switch_flows[dpid].extend(flows)
670 1
                evcs_with_failover.append(evc)
671
            else:
672 1
                check_failover.append(evc)
673
674 1
        offset = 0
675 1
        while switch_flows:
676 1
            offset = (offset + settings.BATCH_SIZE) or None
677 1
            switches = list(switch_flows.keys())
678 1
            for dpid in switches:
679 1
                emit_event(
680
                    self.controller,
681
                    context="kytos.flow_manager",
682
                    name="flows.install",
683
                    dpid=dpid,
684
                    flow_dict={"flows": switch_flows[dpid][:offset]},
685
                )
686 1
                if offset is None or offset >= len(switch_flows[dpid]):
687 1
                    del switch_flows[dpid]
688 1
                    continue
689 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
690 1
            time.sleep(settings.BATCH_INTERVAL)
691
692 1
        for evc in evcs_with_failover:
693 1
            with evc.lock:
694 1
                old_path = evc.current_path
695 1
                evc.current_path = evc.failover_path
696 1
                evc.failover_path = old_path
697 1
                evc.sync()
698 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
699 1
            log.info(
700
                f"{evc} redeployed with failover due to link down {link.id}"
701
            )
702
703 1
        for evc in evcs_normal:
704 1
            emit_event(
705
                self.controller,
706
                "evc_affected_by_link_down",
707
                evc_id=evc.id,
708
                link_id=link.id,
709
            )
710
711
        # After handling the hot path, check if new failover paths are needed.
712
        # Note that EVCs affected by link down will generate a KytosEvent for
713
        # deployed|redeployed, which will trigger the failover path setup.
714
        # Thus, we just need to further check the check_failover list
715 1
        for evc in check_failover:
716 1
            if evc.is_failover_path_affected_by_link(link):
717 1
                evc.setup_failover_path()
718
719 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
720 1
    def on_evc_affected_by_link_down(self, event):
721
        """Change circuit when link is down or under_mantenance."""
722
        self.handle_evc_affected_by_link_down(event)
723
724 1
    def handle_evc_affected_by_link_down(self, event):
725
        """Change circuit when link is down or under_mantenance."""
726 1
        evc = self.circuits.get(event.content["evc_id"])
727 1
        link_id = event.content['link_id']
728 1
        if not evc:
729 1
            return
730 1
        with evc.lock:
731 1
            result = evc.handle_link_down()
732 1
        event_name = "error_redeploy_link_down"
733 1
        if result:
734 1
            log.info(f"{evc} redeployed due to link down {link_id}")
735 1
            event_name = "redeployed_link_down"
736 1
        emit_event(self.controller, event_name, evc_id=evc.id)
737
738 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
739 1
    def on_evc_deployed(self, event):
740
        """Handle EVC deployed|redeployed_link_down."""
741
        self.handle_evc_deployed(event)
742
743 1
    def handle_evc_deployed(self, event):
744
        """Setup failover path on evc deployed."""
745 1
        evc = self.circuits.get(event.content["evc_id"])
746 1
        if not evc:
747 1
            return
748 1
        with evc.lock:
749 1
            evc.setup_failover_path()
750
751 1
    @listen_to("kytos/topology.topology_loaded")
752 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
753
        """Load EVCs once the topology is available."""
754
        self.load_all_evcs()
755
756 1
    def load_all_evcs(self):
757
        """Try to load all EVCs on startup."""
758 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
759 1
        for circuit_id, circuit in circuits:
760 1
            if circuit_id not in self.circuits:
761 1
                self._load_evc(circuit)
762
763 1
    def _load_evc(self, circuit_dict):
764
        """Load one EVC from mongodb to memory."""
765 1
        try:
766 1
            evc = self._evc_from_dict(circuit_dict)
767 1
        except ValueError as exception:
768 1
            log.error(
769
                f"Could not load EVC: dict={circuit_dict} error={exception}"
770
            )
771 1
            return None
772
773 1
        if evc.archived:
774 1
            return None
775 1
        evc.deactivate()
776 1
        evc.sync()
777 1
        self.circuits.setdefault(evc.id, evc)
778 1
        self.sched.add(evc)
779 1
        return evc
780
781 1
    @listen_to("kytos/flow_manager.flow.error")
782 1
    def on_flow_mod_error(self, event):
783
        """Handle flow mod errors related to an EVC."""
784
        self.handle_flow_mod_error(event)
785
786 1
    def handle_flow_mod_error(self, event):
787
        """Handle flow mod errors related to an EVC."""
788 1
        flow = event.content["flow"]
789 1
        command = event.content.get("error_command")
790 1
        if command != "add":
791
            return
792 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
793 1
        if evc:
794 1
            evc.remove_current_flows()
795
796 1
    def _evc_dict_with_instances(self, evc_dict):
797
        """Convert some dict values to instance of EVC classes.
798
799
        This method will convert: [UNI, Link]
800
        """
801 1
        data = evc_dict.copy()  # Do not modify the original dict
802 1
        for attribute, value in data.items():
803
            # Get multiple attributes.
804
            # Ex: uni_a, uni_z
805 1
            if "uni" in attribute:
806 1
                try:
807 1
                    data[attribute] = self._uni_from_dict(value)
808 1
                except ValueError as exception:
809 1
                    result = "Error creating UNI: Invalid value"
810 1
                    raise ValueError(result) from exception
811
812 1
            if attribute == "circuit_scheduler":
813 1
                data[attribute] = []
814 1
                for schedule in value:
815 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
816
817
            # Get multiple attributes.
818
            # Ex: primary_links,
819
            #     backup_links,
820
            #     current_links_cache,
821
            #     primary_links_cache,
822
            #     backup_links_cache
823 1
            if "links" in attribute:
824 1
                data[attribute] = [
825
                    self._link_from_dict(link) for link in value
826
                ]
827
828
            # Ex: current_path,
829
            #     primary_path,
830
            #     backup_path
831 1
            if "path" in attribute and attribute != "dynamic_backup_path":
832 1
                data[attribute] = Path(
833
                    [self._link_from_dict(link) for link in value]
834
                )
835
836 1
        return data
837
838 1
    def _evc_from_dict(self, evc_dict):
839 1
        data = self._evc_dict_with_instances(evc_dict)
840 1
        return EVC(self.controller, **data)
841
842 1
    def _uni_from_dict(self, uni_dict):
843
        """Return a UNI object from python dict."""
844 1
        if uni_dict is None:
845 1
            return False
846
847 1
        interface_id = uni_dict.get("interface_id")
848 1
        interface = self.controller.get_interface_by_id(interface_id)
849 1
        if interface is None:
850 1
            result = (
851
                "Error creating UNI:"
852
                + f"Could not instantiate interface {interface_id}"
853
            )
854 1
            raise ValueError(result) from ValueError
855
856 1
        tag_dict = uni_dict.get("tag", None)
857 1
        if tag_dict:
858 1
            tag = TAG.from_dict(tag_dict)
859
        else:
860 1
            tag = None
861 1
        uni = UNI(interface, tag)
862
863 1
        return uni
864
865 1
    def _link_from_dict(self, link_dict):
866
        """Return a Link object from python dict."""
867 1
        id_a = link_dict.get("endpoint_a").get("id")
868 1
        id_b = link_dict.get("endpoint_b").get("id")
869
870 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
871 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
872
873 1
        link = Link(endpoint_a, endpoint_b)
874 1
        if "metadata" in link_dict:
875
            link.extend_metadata(link_dict.get("metadata"))
876
877 1
        s_vlan = link.get_metadata("s_vlan")
878 1
        if s_vlan:
879
            tag = TAG.from_dict(s_vlan)
880
            if tag is False:
881
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
882
                raise ValueError(error_msg)
883
            link.update_metadata("s_vlan", tag)
884 1
        return link
885
886 1
    def _find_evc_by_schedule_id(self, schedule_id):
887
        """
888
        Find an EVC and CircuitSchedule based on schedule_id.
889
890
        :param schedule_id: Schedule ID
891
        :return: EVC and Schedule
892
        """
893 1
        circuits = self._get_circuits_buffer()
894 1
        found_schedule = None
895 1
        evc = None
896
897
        # pylint: disable=unused-variable
898 1
        for c_id, circuit in circuits.items():
899 1
            for schedule in circuit.circuit_scheduler:
900 1
                if schedule.id == schedule_id:
901 1
                    found_schedule = schedule
902 1
                    evc = circuit
903 1
                    break
904 1
            if found_schedule:
905 1
                break
906 1
        return evc, found_schedule
907
908 1
    def _get_circuits_buffer(self):
909
        """
910
        Return the circuit buffer.
911
912
        If the buffer is empty, try to load data from mongodb.
913
        """
914 1
        if not self.circuits:
915
            # Load circuits from mongodb to buffer
916 1
            circuits = self.mongo_controller.get_circuits()['circuits']
917 1
            for c_id, circuit in circuits.items():
918 1
                evc = self._evc_from_dict(circuit)
919 1
                self.circuits[c_id] = evc
920 1
        return self.circuits
921
922 1
    @staticmethod
923 1
    def _json_from_request(caller):
924
        """Return a json from request.
925
926
        If it was not possible to get a json from the request, log, for debug,
927
        who was the caller and the error that ocurred, and raise an
928
        Exception.
929
        """
930 1
        try:
931 1
            json_data = request.get_json()
932
        except ValueError as exception:
933
            log.error(exception)
934
            log.debug(f"{caller} result {exception} 400")
935
            raise BadRequest(str(exception)) from BadRequest
936
        except BadRequest:
937
            result = "The request is not a valid JSON."
938
            log.debug(f"{caller} result {result} 400")
939
            raise BadRequest(result) from BadRequest
940 1
        if json_data is None:
941 1
            result = "Content-Type must be application/json"
942 1
            log.debug(f"{caller} result {result} 415")
943 1
            raise UnsupportedMediaType(result)
944
        return json_data
945