Test Failed
Pull Request — master (#175)
by Italo Valcy
03:23
created

build.main.Main.handle_link_down()   F

Complexity

Conditions 14

Size

Total Lines 82
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 20.0506

Importance

Changes 0
Metric Value
eloc 51
dl 0
loc 82
ccs 35
cts 51
cp 0.6863
rs 3.6
c 0
b 0
f 0
cc 14
nop 2
crap 20.0506

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.handle_link_down() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
from threading import Lock, Thread
7
import time
8 1
9 1
from kytos.core.apm import begin_span
10
from elasticapm.traces import execution_context
11
from flask import jsonify, request
12
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
13 1
                                 MethodNotAllowed, NotFound,
14 1
                                 UnsupportedMediaType)
15 1
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.helpers import listen_to
19 1
from kytos.core.interface import TAG, UNI
20 1
from kytos.core.link import Link
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import InvalidPath
23
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
24
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
25
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate, send_flow_mods
26 1
27
28
# pylint: disable=too-many-public-methods
29
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32 1
    This class is the entry point for this napp.
33
    """
34 1
35
    spec = load_spec()
36
37
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43 1
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47 1
48
        # object to save and load circuits
49
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53
        DynamicPathManager.set_controller(self.controller)
54 1
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56 1
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58 1
59
        self._lock = Lock()
60 1
        self.execution_rounds = -1
61
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62 1
63 1
        self.load_all_evcs()
64
65
    @staticmethod
66
    def get_eline_controller():
67 1
        """Return the ELineController instance."""
68
        return controllers.ELineController()
69 1
70
    def execute(self):
71
        """Execute once when the napp is running."""
72 1
        if self.execution_rounds < 0:
73 1
            self.execution_rounds += 1
74 1
            return
75 1
        if self._lock.locked():
76 1
            return
77 1
        log.debug("Starting consistency routine")
78
        with self._lock:
79 1
            self.execute_consistency()
80
        log.debug("Finished consistency routine")
81 1
82 1
    def execute_consistency(self):
83 1
        """Execute consistency routine."""
84 1
        self.execution_rounds += 1
85 1
        #return
86
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
87
        for circuit in tuple(self.circuits.values()):
88
            stored_circuits.pop(circuit.id, None)
89
            if (
90 1
                circuit.is_enabled()
91 1
                and not circuit.is_active()
92 1
                and not circuit.lock.locked()
93 1
            ):
94 1
                if circuit.check_traces():
95
                    log.info(f"{circuit} enabled but inactive - activating")
96 1
                    with circuit.lock:
97 1
                        circuit.activate()
98 1
                        circuit.sync()
99 1
                else:
100 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
101 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
102 1
                        with circuit.lock:
103
                            circuit.deploy()
104 1
        for circuit_id in stored_circuits:
105
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
106
            self._load_evc(stored_circuits[circuit_id])
107
108
    def shutdown(self):
109
        """Execute when your napp is unloaded.
110 1
111 1
        If you have some cleanup procedure, insert it here.
112
        """
113
114
    @rest("/v2/evc/", methods=["GET"])
115
    def list_circuits(self):
116
        """Endpoint to return circuits stored.
117 1
118 1
        If archived is set to True return all circuits, else only the ones
119 1
        not archived.
120 1
        """
121 1
        log.debug("list_circuits /v2/evc")
122 1
        archived = request.args.get("archived", False)
123 1
        circuits = self.mongo_controller.get_circuits()['circuits']
124 1
        if not circuits:
125
            return jsonify({}), 200
126
        if archived:
127
            return jsonify(circuits), 200
128
        return (
129
            jsonify(
130
                {
131
                    circuit_id: circuit
132
                    for circuit_id, circuit in circuits.items()
133
                    if not circuit.get("archived", False)
134
                }
135 1
            ),
136 1
            200,
137
        )
138 1
139 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
140
    def get_circuit(self, circuit_id):
141 1
        """Endpoint to return a circuit based on id."""
142 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
143 1
        circuits = self.mongo_controller.get_circuits()['circuits']
144 1
145 1
        try:
146 1
            result = circuits[circuit_id]
147 1
        except KeyError:
148 1
            result = f"circuit_id {circuit_id} not found"
149 1
            log.debug("get_circuit result %s %s", result, 404)
150
            raise NotFound(result) from KeyError
151 1
        status = 200
152 1
        log.debug("get_circuit result %s %s", result, status)
153 1
        return jsonify(result), status
154
155
    @rest("/v2/evc/", methods=["POST"])
156
    @validate(spec)
157
    def create_circuit(self, data):
158
        """Try to create a new circuit.
159
160
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
161
        UNI_Z's requested C-VID are available from the interfaces' pools. This
162
        is checked when creating the UNI object.
163
164
        Then, E-Line NApp requests a primary and a backup path to the
165
        Pathfinder NApp using the attributes primary_links and backup_links
166
        submitted via REST
167
168
        # For each link composing paths in #3:
169
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
170
        #  - Using the S-VID obtained, generate abstract flow entries to be
171
        #    sent to FlowManager
172
173
        Push abstract flow entries to FlowManager and FlowManager pushes
174
        OpenFlow entries to datapaths
175
176
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
177
        creation
178 1
179
        Finnaly, notify user of the status of its request.
180 1
        """
181 1
        # Try to create the circuit object
182 1
        log.debug("create_circuit /v2/evc/")
183 1
184 1
        try:
185
            evc = self._evc_from_dict(data)
186 1
        except ValueError as exception:
187
            log.debug("create_circuit result %s %s", exception, 400)
188
            raise BadRequest(str(exception)) from BadRequest
189
190
        if evc.primary_path:
191
            try:
192
                evc.primary_path.is_valid(
193
                    evc.uni_a.interface.switch,
194
                    evc.uni_z.interface.switch,
195
                    bool(evc.circuit_scheduler),
196
                )
197 1
            except InvalidPath as exception:
198
                raise BadRequest(
199
                    f"primary_path is not valid: {exception}"
200
                ) from exception
201
        if evc.backup_path:
202
            try:
203
                evc.backup_path.is_valid(
204
                    evc.uni_a.interface.switch,
205
                    evc.uni_z.interface.switch,
206
                    bool(evc.circuit_scheduler),
207
                )
208
            except InvalidPath as exception:
209
                raise BadRequest(
210 1
                    f"backup_path is not valid: {exception}"
211 1
                ) from exception
212 1
213 1
        # verify duplicated evc
214
        if self._is_duplicated_evc(evc):
215 1
            result = "The EVC already exists."
216
            log.debug("create_circuit result %s %s", result, 409)
217
            raise Conflict(result)
218
219
        if (
220 1
            not evc.primary_path
221 1
            and evc.dynamic_backup_path is False
222 1
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
223
        ):
224
            result = "The EVC must have a primary path or allow dynamic paths."
225 1
            log.debug("create_circuit result %s %s", result, 400)
226
            raise BadRequest(result)
227
228 1
        # store circuit in dictionary
229
        self.circuits[evc.id] = evc
230
231 1
        # save circuit
232
        evc.sync()
233
234 1
        # Schedule the circuit deploy
235 1
        self.sched.add(evc)
236 1
237
        # Circuit has no schedule, deploy now
238
        if not evc.circuit_scheduler:
239 1
            with evc.lock:
240
                evc.deploy()
241
242 1
        # Notify users
243
        event = KytosEvent(
244 1
            name="kytos.mef_eline.created", content=evc.as_dict()
245 1
        )
246 1
        self.controller.buffers.app.put(event)
247 1
248 1
        result = {"circuit_id": evc.id}
249
        status = 201
250 1
        log.debug("create_circuit result %s %s", result, status)
251 1
        emit_event(self.controller, "created", evc_id=evc.id)
252
        return jsonify(result), status
253
254
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
255
    def update(self, circuit_id):
256 1
        """Update a circuit based on payload.
257 1
258 1
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
259 1
        """
260 1
        log.debug("update /v2/evc/%s", circuit_id)
261 1
        try:
262 1
            evc = self.circuits[circuit_id]
263
        except KeyError:
264 1
            result = f"circuit_id {circuit_id} not found"
265 1
            log.debug("update result %s %s", result, 404)
266 1
            raise NotFound(result) from NotFound
267 1
268
        if evc.archived:
269 1
            result = "Can't update archived EVC"
270 1
            log.debug("update result %s %s", result, 405)
271 1
            raise MethodNotAllowed(["GET"], result)
272 1
273 1
        try:
274 1
            data = request.get_json()
275 1
        except BadRequest:
276 1
            result = "The request body is not a well-formed JSON."
277 1
            log.debug("update result %s %s", result, 400)
278 1
            raise BadRequest(result) from BadRequest
279
        if data is None:
280 1
            result = "The request body mimetype is not application/json."
281 1
            log.debug("update result %s %s", result, 415)
282
            raise UnsupportedMediaType(result) from UnsupportedMediaType
283
284 1
        try:
285 1
            enable, redeploy = evc.update(
286 1
                **self._evc_dict_with_instances(data)
287 1
            )
288
        except ValueError as exception:
289 1
            log.error(exception)
290
            log.debug("update result %s %s", exception, 400)
291
            raise BadRequest(str(exception)) from BadRequest
292
293
        if evc.is_active():
294
            if enable is False:  # disable if active
295
                with evc.lock:
296
                    evc.remove()
297
            elif redeploy is not None:  # redeploy if active
298 1
                with evc.lock:
299 1
                    evc.remove()
300 1
                    evc.deploy()
301 1
        else:
302 1
            if enable is True:  # enable if inactive
303
                with evc.lock:
304 1
                    evc.deploy()
305 1
        result = {evc.id: evc.as_dict()}
306 1
        status = 200
307
308 1
        log.debug("update result %s %s", result, status)
309 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
310
        return jsonify(result), status
311
312
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
313
    def delete_circuit(self, circuit_id):
314
        """Remove a circuit.
315 1
316 1
        First, the flows are removed from the switches, and then the EVC is
317 1
        disabled.
318 1
        """
319 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
320 1
        try:
321 1
            evc = self.circuits[circuit_id]
322
        except KeyError:
323 1
            result = f"circuit_id {circuit_id} not found"
324 1
            log.debug("delete_circuit result %s %s", result, 404)
325 1
            raise NotFound(result) from NotFound
326 1
327
        if evc.archived:
328 1
            result = f"Circuit {circuit_id} already removed"
329 1
            log.debug("delete_circuit result %s %s", result, 404)
330 1
            raise NotFound(result) from NotFound
331 1
332 1
        log.info("Removing %s", evc)
333 1
        with evc.lock:
334 1
            evc.remove_current_flows()
335 1
            evc.deactivate()
336 1
            evc.disable()
337 1
            self.sched.remove(evc)
338 1
            evc.archive()
339
            evc.sync()
340 1
        log.info("EVC removed. %s", evc)
341 1
        result = {"response": f"Circuit {circuit_id} removed"}
342 1
        status = 200
343
344 1
        log.debug("delete_circuit result %s %s", result, status)
345 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
346
        return jsonify(result), status
347 1
348 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
349
    def get_metadata(self, circuit_id):
350
        """Get metadata from an EVC."""
351
        try:
352
            return (
353
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
354
                200,
355 1
            )
356 1
        except KeyError as error:
357
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
358 1
359 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
360 1
    def add_metadata(self, circuit_id):
361 1
        """Add metadata to an EVC."""
362 1
        try:
363 1
            metadata = request.get_json()
364 1
            content_type = request.content_type
365 1
        except BadRequest as error:
366 1
            result = "The request body is not a well-formed JSON."
367 1
            raise BadRequest(result) from error
368 1
        if content_type is None:
369 1
            result = "The request body is empty."
370
            raise BadRequest(result)
371
        if metadata is None:
372
            if content_type != "application/json":
373
                result = (
374
                    "The content type must be application/json "
375 1
                    f"(received {content_type})."
376
                )
377 1
            else:
378 1
                result = "Metadata is empty."
379 1
            raise UnsupportedMediaType(result)
380 1
381
        try:
382 1
            evc = self.circuits[circuit_id]
383 1
        except KeyError as error:
384 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
385
386 1
        evc.extend_metadata(metadata)
387 1
        evc.sync()
388
        return jsonify("Operation successful"), 201
389 1
390 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
391 1
    def delete_metadata(self, circuit_id, key):
392 1
        """Delete metadata from an EVC."""
393
        try:
394 1
            evc = self.circuits[circuit_id]
395 1
        except KeyError as error:
396 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
397
398 1
        evc.remove_metadata(key)
399 1
        evc.sync()
400
        return jsonify("Operation successful"), 200
401 1
402 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
403 1
    def redeploy(self, circuit_id):
404 1
        """Endpoint to force the redeployment of an EVC."""
405 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
406 1
        try:
407 1
            evc = self.circuits[circuit_id]
408 1
        except KeyError:
409 1
            result = f"circuit_id {circuit_id} not found"
410 1
            raise NotFound(result) from NotFound
411 1
        if evc.is_enabled():
412 1
            with evc.lock:
413
                evc.remove_current_flows()
414 1
                evc.deploy()
415 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
416
            status = 202
417 1
        else:
418
            result = {"response": f"Circuit {circuit_id} is disabled."}
419 1
            status = 409
420 1
421
        return jsonify(result), status
422
423
    @rest("/v2/evc/schedule", methods=["GET"])
424
    def list_schedules(self):
425
        """Endpoint to return all schedules stored for all circuits.
426
427
        Return a JSON with the following template:
428 1
        [{"schedule_id": <schedule_id>,
429 1
         "circuit_id": <circuit_id>,
430 1
         "schedule": <schedule object>}]
431 1
        """
432 1
        log.debug("list_schedules /v2/evc/schedule")
433 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
434 1
        if not circuits:
435
            result = {}
436 1
            status = 200
437 1
            return jsonify(result), status
438 1
439 1
        result = []
440 1
        status = 200
441 1
        for circuit in circuits:
442 1
            circuit_scheduler = circuit.get("circuit_scheduler")
443
            if circuit_scheduler:
444
                for scheduler in circuit_scheduler:
445
                    value = {
446
                        "schedule_id": scheduler.get("id"),
447 1
                        "circuit_id": circuit.get("id"),
448
                        "schedule": scheduler,
449 1
                    }
450 1
                    result.append(value)
451
452 1
        log.debug("list_schedules result %s %s", result, status)
453 1
        return jsonify(result), status
454
455
    @rest("/v2/evc/schedule/", methods=["POST"])
456
    def create_schedule(self):
457
        """
458
        Create a new schedule for a given circuit.
459
460
        This service do no check if there are conflicts with another schedule.
461
        Payload example:
462
            {
463
              "circuit_id":"aa:bb:cc",
464
              "schedule": {
465
                "date": "2019-08-07T14:52:10.967Z",
466
                "interval": "string",
467
                "frequency": "1 * * * *",
468
                "action": "create"
469 1
              }
470
            }
471 1
        """
472 1
        log.debug("create_schedule /v2/evc/schedule/")
473 1
474 1
        json_data = self._json_from_request("create_schedule")
475 1
        try:
476 1
            circuit_id = json_data["circuit_id"]
477 1
        except TypeError:
478 1
            result = "The payload should have a dictionary."
479 1
            log.debug("create_schedule result %s %s", result, 400)
480 1
            raise BadRequest(result) from BadRequest
481 1
        except KeyError:
482
            result = "Missing circuit_id."
483 1
            log.debug("create_schedule result %s %s", result, 400)
484 1
            raise BadRequest(result) from BadRequest
485 1
486 1
        try:
487 1
            schedule_data = json_data["schedule"]
488 1
        except KeyError:
489
            result = "Missing schedule data."
490
            log.debug("create_schedule result %s %s", result, 400)
491 1
            raise BadRequest(result) from BadRequest
492
493
        # Get EVC from circuits buffer
494 1
        circuits = self._get_circuits_buffer()
495
496
        # get the circuit
497 1
        evc = circuits.get(circuit_id)
498 1
499 1
        # get the circuit
500 1
        if not evc:
501
            result = f"circuit_id {circuit_id} not found"
502 1
            log.debug("create_schedule result %s %s", result, 404)
503 1
            raise NotFound(result) from NotFound
504 1
        # Can not modify circuits deleted and archived
505 1
        if evc.archived:
506
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
507
            log.debug("create_schedule result %s %s", result, 403)
508 1
            raise Forbidden(result) from Forbidden
509
510
        # new schedule from dict
511 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
512 1
513
        # If there is no schedule, create the list
514
        if not evc.circuit_scheduler:
515 1
            evc.circuit_scheduler = []
516
517
        # Add the new schedule
518 1
        evc.circuit_scheduler.append(new_schedule)
519
520
        # Add schedule job
521 1
        self.sched.add_circuit_job(evc, new_schedule)
522
523 1
        # save circuit to mongodb
524 1
        evc.sync()
525
526 1
        result = new_schedule.as_dict()
527 1
        status = 201
528
529 1
        log.debug("create_schedule result %s %s", result, status)
530 1
        return jsonify(result), status
531
532
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
533
    def update_schedule(self, schedule_id):
534
        """Update a schedule.
535
536
        Change all attributes from the given schedule from a EVC circuit.
537
        The schedule ID is preserved as default.
538
        Payload example:
539
            {
540
              "date": "2019-08-07T14:52:10.967Z",
541
              "interval": "string",
542
              "frequency": "1 * * *",
543 1
              "action": "create"
544
            }
545
        """
546 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
547
548
        # Try to find a circuit schedule
549 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
550 1
551 1
        # Can not modify circuits deleted and archived
552 1
        if not found_schedule:
553 1
            result = f"schedule_id {schedule_id} not found"
554 1
            log.debug("update_schedule result %s %s", result, 404)
555 1
            raise NotFound(result) from NotFound
556 1
        if evc.archived:
557
            result = f"Circuit {evc.id} is archived. Update is forbidden."
558 1
            log.debug("update_schedule result %s %s", result, 403)
559
            raise Forbidden(result) from Forbidden
560 1
561 1
        data = self._json_from_request("update_schedule")
562
563 1
        new_schedule = CircuitSchedule.from_dict(data)
564
        new_schedule.id = found_schedule.id
565 1
        # Remove the old schedule
566
        evc.circuit_scheduler.remove(found_schedule)
567
        # Append the modified schedule
568 1
        evc.circuit_scheduler.append(new_schedule)
569
570 1
        # Cancel all schedule jobs
571
        self.sched.cancel_job(found_schedule.id)
572 1
        # Add the new circuit schedule
573
        self.sched.add_circuit_job(evc, new_schedule)
574 1
        # Save EVC to mongodb
575 1
        evc.sync()
576
577 1
        result = new_schedule.as_dict()
578 1
        status = 200
579
580 1
        log.debug("update_schedule result %s %s", result, status)
581 1
        return jsonify(result), status
582
583
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
584
    def delete_schedule(self, schedule_id):
585
        """Remove a circuit schedule.
586
587
        Remove the Schedule from EVC.
588 1
        Remove the Schedule from cron job.
589 1
        Save the EVC to the Storehouse.
590
        """
591
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
592 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
593 1
594 1
        # Can not modify circuits deleted and archived
595 1
        if not found_schedule:
596
            result = f"schedule_id {schedule_id} not found"
597 1
            log.debug("delete_schedule result %s %s", result, 404)
598 1
            raise NotFound(result)
599 1
600 1
        if evc.archived:
601
            result = f"Circuit {evc.id} is archived. Update is forbidden."
602
            log.debug("delete_schedule result %s %s", result, 403)
603 1
            raise Forbidden(result)
604
605
        # Remove the old schedule
606 1
        evc.circuit_scheduler.remove(found_schedule)
607
608 1
        # Cancel all schedule jobs
609
        self.sched.cancel_job(found_schedule.id)
610 1
        # Save EVC to mongodb
611 1
        evc.sync()
612
613 1
        result = "Schedule removed"
614 1
        status = 200
615
616 1
        log.debug("delete_schedule result %s %s", result, status)
617
        return jsonify(result), status
618
619
    def _is_duplicated_evc(self, evc):
620
        """Verify if the circuit given is duplicated with the stored evcs.
621
622
        Args:
623
            evc (EVC): circuit to be analysed.
624
625
        Returns:
626 1
            boolean: True if the circuit is duplicated, otherwise False.
627 1
628 1
        """
629 1
        for circuit in tuple(self.circuits.values()):
630
            if not circuit.archived and circuit.shares_uni(evc):
631 1
                return True
632 1
        return False
633
634
    @listen_to("kytos/topology.link_up")
635
    def on_link_up(self, event):
636 1
        """Change circuit when link is up or end_maintenance."""
637
        self.handle_link_up(event)
638 1
639 1
    def handle_link_up(self, event):
640 1
        """Change circuit when link is up or end_maintenance."""
641 1
        log.debug("Event handle_link_up %s", event)
642 1
        for evc in self.circuits.values():
643
            if evc.is_enabled() and not evc.archived:
644 1
                with evc.lock:
645 1
                    evc.handle_link_up(event.content["link"])
646
647
    @listen_to("kytos/topology.link_down")
648
    def on_link_down(self, event):
649 1
        """Change circuit when link is down or under_mantenance."""
650
        self.handle_link_down(event)
651 1
652 1
    @begin_span
653 1
    def handle_link_down(self, event):
654 1
        """Change circuit when link is down or under_mantenance."""
655 1
        link = event.content["link"]
656 1
        log.info("Event handle_link_down %s", link)
657 1
        switch_flows = {}
658
        evcs_with_failover = []
659
        evcs_normal = []
660
        check_failover = []
661
        transaction = execution_context.get_transaction()
662
        if transaction:
663
            transaction.begin_span("calc_affected_evcs", "custom")
664
        for evc in self.circuits.values():
665
            if evc.is_affected_by_link(link):
666
                # if there is no failover path, handles link down the
667
                # tradditional way
668
                if (
669 1
                    not getattr(evc, 'failover_path', None) or
670 1
                    evc.is_failover_path_affected_by_link(link)
671
                ):
672
                    evcs_normal.append(evc)
673
                    continue
674 1
                for dpid, flows in evc.get_failover_flows().items():
675
                    switch_flows.setdefault(dpid, [])
676 1
                    switch_flows[dpid].extend(flows)
677 1
                evcs_with_failover.append(evc)
678 1
            else:
679 1
                check_failover.append(evc)
680
        if transaction:
681 1
            transaction.end_span()
682
683
        ## TODO: handle exceptions
684
        #threads = []
685
        #for dpid, flows in switch_flows.items():
686
        #    threads.append(
687
        #        Thread(target=send_flow_mods, args=(dpid, flows, 'flows', False, "mef_eline.handle_link_down",))
688
        #    )
689
        #for thread in threads:
690
        #    thread.start()
691
        #for thread in threads:
692
        #    thread.join()
693
694
        for dpid in switch_flows:
695
            # TODO: test using rest
696
            emit_event(
697
                self.controller,
698
                context="kytos.flow_manager",
699
                name="flows.install",
700 1
                dpid=dpid,
701 1
                flow_dict={"flows": switch_flows[dpid]},
702
                log_info="mef_eline.handle_link_down",
703
            )
704
705 1
        # TODO: avoid concurrency
706
        time.sleep(5)
707 1
708 1
        for evc in evcs_with_failover:
709 1
            with evc.lock:
710
                #evc.remove_path_flows(evc.current_path, using_event=True)
711 1
                old_path = evc.current_path
712 1
                evc.current_path = evc.failover_path
713 1
                evc.failover_path = old_path
714
                evc.sync()
715 1
                # check_failover.append(evc.id)
716
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
717
            log.info(f"{evc} redeployed with failover due to link down {link.id}")
718
719
        for evc in evcs_normal:
720 1
            emit_event(
721 1
                self.controller,
722
                "evc_affected_by_link_down",
723
                evc_id=evc.id,
724 1
                link_id=link.id,
725 1
            )
726 1
727 1
        # After handling the hot path, check if new failover paths are needed.
728 1
        # Note that EVCs affected by link down will generate a KytosEvent for
729 1
        # deployed|redeployed, which will trigger the failover path setup.
730
        # Thus, we just need to further check the check_failover list
731 1
        for evc in check_failover:
732 1
            if evc.is_failover_path_affected_by_link(link):
733 1
                evc.setup_failover_path()
734 1
735
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
736
    def on_evc_affected_by_link_down(self, event):
737
        """Change circuit when link is down or under_mantenance."""
738
        self.handle_evc_affected_by_link_down(event)
739
740
    def handle_evc_affected_by_link_down(self, event):
741
        """Change circuit when link is down or under_mantenance."""
742 1
        evc = self.circuits.get(event.content["evc_id"])
743 1
        link_id = event.content['link_id']
744
        if not evc:
745
            return
746
        with evc.lock:
747
            result = evc.handle_link_down()
748
        event_name = "error_redeploy_link_down"
749
        if result:
750 1
            log.info(f"{evc} redeployed due to link down {link_id}")
751 1
            event_name = "redeployed_link_down"
752
        emit_event(self.controller, event_name, evc_id=evc.id)
753
754
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
755 1
    def on_evc_deployed(self, event):
756
        """Handle EVC deployed|redeployed_link_down."""
757 1
        self.handle_evc_deployed(event)
758 1
759 1
    def handle_evc_deployed(self, event):
760
        """Setup failover path on evc deployed."""
761 1
        evc = self.circuits.get(event.content["evc_id"])
762
        if not evc:
763
            return
764
        with evc.lock:
765
            evc.setup_failover_path()
766
767
    @listen_to("kytos/topology.topology_loaded")
768
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
769
        """Load EVCs once the topology is available."""
770
        self.load_all_evcs()
771
772
    def load_all_evcs(self):
773
        """Try to load all EVCs on startup."""
774
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
775
        for circuit_id, circuit in circuits:
776
            if circuit_id not in self.circuits:
777
                self._load_evc(circuit)
778
779
    def _load_evc(self, circuit_dict):
780
        """Load one EVC from mongodb to memory."""
781
        try:
782
            evc = self._evc_from_dict(circuit_dict)
783
        except ValueError as exception:
784 1
            log.error(
785
                f'Could not load EVC {circuit_dict["id"]} '
786 1
                f"because {exception}"
787 1
            )
788
            return None
789 1
790 1
        if evc.archived:
791
            return None
792 1
        evc.deactivate()
793 1
        evc.sync()
794
        self.circuits.setdefault(evc.id, evc)
795
        self.sched.add(evc)
796 1
        return evc
797 1
798
    @listen_to("kytos/flow_manager.flow.error")
799
    def on_flow_mod_error(self, event):
800
        """Handle flow mod errors related to an EVC."""
801
        self.handle_flow_mod_error(event)
802
803 1
    def handle_flow_mod_error(self, event):
804
        """Handle flow mod errors related to an EVC."""
805 1
        flow = event.content["flow"]
806
        command = event.content.get("error_command")
807
        if command != "add":
808
            return
809
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
810
        if evc:
811
            evc.remove_current_flows()
812 1
813 1
    def _evc_dict_with_instances(self, evc_dict):
814 1
        """Convert some dict values to instance of EVC classes.
815
816
        This method will convert: [UNI, Link]
817 1
        """
818 1
        data = evc_dict.copy()  # Do not modify the original dict
819 1
        for attribute, value in data.items():
820 1
            # Get multiple attributes.
821 1
            # Ex: uni_a, uni_z
822 1
            if "uni" in attribute:
823 1
                try:
824 1
                    data[attribute] = self._uni_from_dict(value)
825 1
                except ValueError as exception:
826
                    result = "Error creating UNI: Invalid value"
827 1
                    raise ValueError(result) from exception
828
829
            if attribute == "circuit_scheduler":
830
                data[attribute] = []
831
                for schedule in value:
832
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
833 1
834
            # Get multiple attributes.
835 1
            # Ex: primary_links,
836 1
            #     backup_links,
837 1
            #     current_links_cache,
838 1
            #     primary_links_cache,
839 1
            #     backup_links_cache
840
            if "links" in attribute:
841 1
                data[attribute] = [
842 1
                    self._link_from_dict(link) for link in value
843
                ]
844
845
            # Ex: current_path,
846
            #     primary_path,
847
            #     backup_path
848
            if "path" in attribute and attribute != "dynamic_backup_path":
849 1
                data[attribute] = Path(
850 1
                    [self._link_from_dict(link) for link in value]
851
                )
852
853
        return data
854
855
    def _evc_from_dict(self, evc_dict):
856
        data = self._evc_dict_with_instances(evc_dict)
857
        return EVC(self.controller, **data)
858
859 1
    def _uni_from_dict(self, uni_dict):
860 1
        """Return a UNI object from python dict."""
861 1
        if uni_dict is None:
862 1
            return False
863 1
864
        interface_id = uni_dict.get("interface_id")
865
        interface = self.controller.get_interface_by_id(interface_id)
866
        if interface is None:
867
            result = (
868
                "Error creating UNI:"
869
                + f"Could not instantiate interface {interface_id}"
870
            )
871
            raise ValueError(result) from ValueError
872
873
        tag_dict = uni_dict.get("tag", None)
874
        if tag_dict:
875
            tag = TAG.from_dict(tag_dict)
876
        else:
877
            tag = None
878
        uni = UNI(interface, tag)
879
880
        return uni
881
882
    def _link_from_dict(self, link_dict):
883
        """Return a Link object from python dict."""
884
        id_a = link_dict.get("endpoint_a").get("id")
885
        id_b = link_dict.get("endpoint_b").get("id")
886
887
        endpoint_a = self.controller.get_interface_by_id(id_a)
888
        endpoint_b = self.controller.get_interface_by_id(id_b)
889
890
        link = Link(endpoint_a, endpoint_b)
891
        if "metadata" in link_dict:
892
            link.extend_metadata(link_dict.get("metadata"))
893
894
        s_vlan = link.get_metadata("s_vlan")
895
        if s_vlan:
896
            tag = TAG.from_dict(s_vlan)
897
            if tag is False:
898
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
899
                raise ValueError(error_msg)
900
            link.update_metadata("s_vlan", tag)
901
        return link
902
903
    def _find_evc_by_schedule_id(self, schedule_id):
904
        """
905
        Find an EVC and CircuitSchedule based on schedule_id.
906
907
        :param schedule_id: Schedule ID
908
        :return: EVC and Schedule
909
        """
910
        circuits = self._get_circuits_buffer()
911
        found_schedule = None
912
        evc = None
913
914
        # pylint: disable=unused-variable
915
        for c_id, circuit in circuits.items():
916
            for schedule in circuit.circuit_scheduler:
917
                if schedule.id == schedule_id:
918
                    found_schedule = schedule
919
                    evc = circuit
920
                    break
921
            if found_schedule:
922
                break
923
        return evc, found_schedule
924
925
    def _get_circuits_buffer(self):
926
        """
927
        Return the circuit buffer.
928
929
        If the buffer is empty, try to load data from mongodb.
930
        """
931
        if not self.circuits:
932
            # Load circuits from mongodb to buffer
933
            circuits = self.mongo_controller.get_circuits()['circuits']
934
            for c_id, circuit in circuits.items():
935
                evc = self._evc_from_dict(circuit)
936
                self.circuits[c_id] = evc
937
        return self.circuits
938
939
    @staticmethod
940
    def _json_from_request(caller):
941
        """Return a json from request.
942
943
        If it was not possible to get a json from the request, log, for debug,
944
        who was the caller and the error that ocurred, and raise an
945
        Exception.
946
        """
947
        try:
948
            json_data = request.get_json()
949
        except ValueError as exception:
950
            log.error(exception)
951
            log.debug(f"{caller} result {exception} 400")
952
            raise BadRequest(str(exception)) from BadRequest
953
        except BadRequest:
954
            result = "The request is not a valid JSON."
955
            log.debug(f"{caller} result {result} 400")
956
            raise BadRequest(result) from BadRequest
957
        if json_data is None:
958
            result = "Content-Type must be application/json"
959
            log.debug(f"{caller} result {result} 415")
960
            raise UnsupportedMediaType(result)
961
        return json_data
962