Test Failed
Pull Request — master (#232)
by Vinicius
06:58
created

build.main.Main.on_evc_affected_by_link_down()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.037
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
            ):
99 1
                if circuit.check_traces():
100 1
                    log.info(f"{circuit} enabled but inactive - activating")
101 1
                    with circuit.lock:
102 1
                        circuit.activate()
103 1
                        circuit.sync()
104
                else:
105 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
106 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
107 1
                        with circuit.lock:
108 1
                            circuit.deploy()
109 1
        for circuit_id in stored_circuits:
110 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
111 1
            self._load_evc(stored_circuits[circuit_id])
112
113 1
    def shutdown(self):
114
        """Execute when your napp is unloaded.
115
116
        If you have some cleanup procedure, insert it here.
117
        """
118
119 1
    @rest("/v2/evc/", methods=["GET"])
120 1
    def list_circuits(self):
121
        """Endpoint to return circuits stored.
122
123
        archive query arg if defined (not null) will be filtered
124
        accordingly, by default only non archived evcs will be listed
125
        """
126 1
        log.debug("list_circuits /v2/evc")
127 1
        archived = request.args.get("archived", "false").lower()
128 1
        archived_to_optional = {"null": None, "true": True, "false": False}
129 1
        archived = archived_to_optional.get(archived, False)
130 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
131 1
        circuits = circuits['circuits']
132 1
        return jsonify(circuits), 200
133
134 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
135 1
    def get_circuit(self, circuit_id):
136
        """Endpoint to return a circuit based on id."""
137 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
138 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
139 1
        if not circuit:
140 1
            result = f"circuit_id {circuit_id} not found"
141 1
            log.debug("get_circuit result %s %s", result, 404)
142 1
            raise NotFound(result)
143 1
        status = 200
144 1
        log.debug("get_circuit result %s %s", circuit, status)
145 1
        return jsonify(circuit), status
146
147 1
    @rest("/v2/evc/", methods=["POST"])
148 1
    @validate(spec)
149 1
    def create_circuit(self, data):
150
        """Try to create a new circuit.
151
152
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
153
        UNI_Z's requested C-VID are available from the interfaces' pools. This
154
        is checked when creating the UNI object.
155
156
        Then, E-Line NApp requests a primary and a backup path to the
157
        Pathfinder NApp using the attributes primary_links and backup_links
158
        submitted via REST
159
160
        # For each link composing paths in #3:
161
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
162
        #  - Using the S-VID obtained, generate abstract flow entries to be
163
        #    sent to FlowManager
164
165
        Push abstract flow entries to FlowManager and FlowManager pushes
166
        OpenFlow entries to datapaths
167
168
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
169
        creation
170
171
        Finnaly, notify user of the status of its request.
172
        """
173
        # Try to create the circuit object
174 1
        log.debug("create_circuit /v2/evc/")
175
176 1
        try:
177 1
            evc = self._evc_from_dict(data)
178 1
        except ValueError as exception:
179 1
            log.debug("create_circuit result %s %s", exception, 400)
180 1
            raise BadRequest(str(exception)) from BadRequest
181
182 1
        if evc.primary_path:
183
            try:
184
                evc.primary_path.is_valid(
185
                    evc.uni_a.interface.switch,
186
                    evc.uni_z.interface.switch,
187
                    bool(evc.circuit_scheduler),
188
                )
189
            except InvalidPath as exception:
190
                raise BadRequest(
191
                    f"primary_path is not valid: {exception}"
192
                ) from exception
193 1
        if evc.backup_path:
194
            try:
195
                evc.backup_path.is_valid(
196
                    evc.uni_a.interface.switch,
197
                    evc.uni_z.interface.switch,
198
                    bool(evc.circuit_scheduler),
199
                )
200
            except InvalidPath as exception:
201
                raise BadRequest(
202
                    f"backup_path is not valid: {exception}"
203
                ) from exception
204
205
        # verify duplicated evc
206 1
        if self._is_duplicated_evc(evc):
207 1
            result = "The EVC already exists."
208 1
            log.debug("create_circuit result %s %s", result, 409)
209 1
            raise Conflict(result)
210
211 1
        if (
212
            not evc.primary_path
213
            and evc.dynamic_backup_path is False
214
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
215
        ):
216 1
            result = "The EVC must have a primary path or allow dynamic paths."
217 1
            log.debug("create_circuit result %s %s", result, 400)
218 1
            raise BadRequest(result)
219
220
        # store circuit in dictionary
221 1
        self.circuits[evc.id] = evc
222
223
        # save circuit
224 1
        evc.sync()
225
226
        # Schedule the circuit deploy
227 1
        self.sched.add(evc)
228
229
        # Circuit has no schedule, deploy now
230 1
        if not evc.circuit_scheduler:
231 1
            with evc.lock:
232 1
                evc.deploy()
233
234
        # Notify users
235 1
        event = KytosEvent(
236
            name="kytos.mef_eline.created", content=evc.as_dict()
237
        )
238 1
        self.controller.buffers.app.put(event)
239
240 1
        result = {"circuit_id": evc.id}
241 1
        status = 201
242 1
        log.debug("create_circuit result %s %s", result, status)
243 1
        emit_event(self.controller, "created", evc_id=evc.id)
244 1
        return jsonify(result), status
245
246 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
247 1
    def update(self, circuit_id):
248
        """Update a circuit based on payload.
249
250
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
251
        """
252 1
        log.debug("update /v2/evc/%s", circuit_id)
253 1
        try:
254 1
            evc = self.circuits[circuit_id]
255 1
        except KeyError:
256 1
            result = f"circuit_id {circuit_id} not found"
257 1
            log.debug("update result %s %s", result, 404)
258 1
            raise NotFound(result) from NotFound
259
260 1
        if evc.archived:
261 1
            result = "Can't update archived EVC"
262 1
            log.debug("update result %s %s", result, 405)
263 1
            raise MethodNotAllowed(["GET"], result)
264
265 1
        try:
266 1
            data = request.get_json()
267 1
        except BadRequest:
268 1
            result = "The request body is not a well-formed JSON."
269 1
            log.debug("update result %s %s", result, 400)
270 1
            raise BadRequest(result) from BadRequest
271 1
        if data is None:
272 1
            result = "The request body mimetype is not application/json."
273 1
            log.debug("update result %s %s", result, 415)
274 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
275
276 1
        try:
277 1
            enable, redeploy = evc.update(
278
                **self._evc_dict_with_instances(data)
279
            )
280 1
        except ValueError as exception:
281 1
            log.error(exception)
282 1
            log.debug("update result %s %s", exception, 400)
283 1
            raise BadRequest(str(exception)) from BadRequest
284
285 1
        if evc.is_active():
286
            if enable is False:  # disable if active
287
                with evc.lock:
288
                    evc.remove()
289
            elif redeploy is not None:  # redeploy if active
290
                with evc.lock:
291
                    evc.remove()
292
                    evc.deploy()
293
        else:
294 1
            if enable is True:  # enable if inactive
295 1
                with evc.lock:
296 1
                    evc.deploy()
297 1
        result = {evc.id: evc.as_dict()}
298 1
        status = 200
299
300 1
        log.debug("update result %s %s", result, status)
301 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
302 1
        return jsonify(result), status
303
304 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
305 1
    def delete_circuit(self, circuit_id):
306
        """Remove a circuit.
307
308
        First, the flows are removed from the switches, and then the EVC is
309
        disabled.
310
        """
311 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
312 1
        try:
313 1
            evc = self.circuits[circuit_id]
314 1
        except KeyError:
315 1
            result = f"circuit_id {circuit_id} not found"
316 1
            log.debug("delete_circuit result %s %s", result, 404)
317 1
            raise NotFound(result) from NotFound
318
319 1
        if evc.archived:
320 1
            result = f"Circuit {circuit_id} already removed"
321 1
            log.debug("delete_circuit result %s %s", result, 404)
322 1
            raise NotFound(result) from NotFound
323
324 1
        log.info("Removing %s", evc)
325 1
        with evc.lock:
326 1
            evc.remove_current_flows()
327 1
            evc.remove_failover_flows(sync=False)
328 1
            evc.deactivate()
329 1
            evc.disable()
330 1
            self.sched.remove(evc)
331 1
            evc.archive()
332 1
            evc.sync()
333 1
        log.info("EVC removed. %s", evc)
334 1
        result = {"response": f"Circuit {circuit_id} removed"}
335 1
        status = 200
336
337 1
        log.debug("delete_circuit result %s %s", result, status)
338 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
339 1
        return jsonify(result), status
340
341 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
342 1
    def get_metadata(self, circuit_id):
343
        """Get metadata from an EVC."""
344 1
        try:
345 1
            return (
346
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
347
                200,
348
            )
349
        except KeyError as error:
350
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
351
352 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
353 1
    def add_metadata(self, circuit_id):
354
        """Add metadata to an EVC."""
355 1
        try:
356 1
            metadata = request.get_json()
357 1
            content_type = request.content_type
358 1
        except BadRequest as error:
359 1
            result = "The request body is not a well-formed JSON."
360 1
            raise BadRequest(result) from error
361 1
        if content_type is None:
362 1
            result = "The request body is empty."
363 1
            raise BadRequest(result)
364 1
        if metadata is None:
365 1
            if content_type != "application/json":
366 1
                result = (
367
                    "The content type must be application/json "
368
                    f"(received {content_type})."
369
                )
370
            else:
371
                result = "Metadata is empty."
372 1
            raise UnsupportedMediaType(result)
373
374 1
        try:
375 1
            evc = self.circuits[circuit_id]
376 1
        except KeyError as error:
377 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
378
379 1
        evc.extend_metadata(metadata)
380 1
        evc.sync()
381 1
        return jsonify("Operation successful"), 201
382
383 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
384 1
    def delete_metadata(self, circuit_id, key):
385
        """Delete metadata from an EVC."""
386 1
        try:
387 1
            evc = self.circuits[circuit_id]
388 1
        except KeyError as error:
389 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
390
391 1
        evc.remove_metadata(key)
392 1
        evc.sync()
393 1
        return jsonify("Operation successful"), 200
394
395 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
396 1
    def redeploy(self, circuit_id):
397
        """Endpoint to force the redeployment of an EVC."""
398 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
399 1
        try:
400 1
            evc = self.circuits[circuit_id]
401 1
        except KeyError:
402 1
            result = f"circuit_id {circuit_id} not found"
403 1
            raise NotFound(result) from NotFound
404 1
        if evc.is_enabled():
405 1
            with evc.lock:
406 1
                evc.remove_current_flows()
407 1
                evc.deploy()
408 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
409 1
            status = 202
410
        else:
411 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
412 1
            status = 409
413
414 1
        return jsonify(result), status
415
416 1
    @rest("/v2/evc/schedule", methods=["GET"])
417 1
    def list_schedules(self):
418
        """Endpoint to return all schedules stored for all circuits.
419
420
        Return a JSON with the following template:
421
        [{"schedule_id": <schedule_id>,
422
         "circuit_id": <circuit_id>,
423
         "schedule": <schedule object>}]
424
        """
425 1
        log.debug("list_schedules /v2/evc/schedule")
426 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
427 1
        if not circuits:
428 1
            result = {}
429 1
            status = 200
430 1
            return jsonify(result), status
431
432 1
        result = []
433 1
        status = 200
434 1
        for circuit in circuits:
435 1
            circuit_scheduler = circuit.get("circuit_scheduler")
436 1
            if circuit_scheduler:
437 1
                for scheduler in circuit_scheduler:
438 1
                    value = {
439
                        "schedule_id": scheduler.get("id"),
440
                        "circuit_id": circuit.get("id"),
441
                        "schedule": scheduler,
442
                    }
443 1
                    result.append(value)
444
445 1
        log.debug("list_schedules result %s %s", result, status)
446 1
        return jsonify(result), status
447
448 1
    @rest("/v2/evc/schedule/", methods=["POST"])
449 1
    def create_schedule(self):
450
        """
451
        Create a new schedule for a given circuit.
452
453
        This service do no check if there are conflicts with another schedule.
454
        Payload example:
455
            {
456
              "circuit_id":"aa:bb:cc",
457
              "schedule": {
458
                "date": "2019-08-07T14:52:10.967Z",
459
                "interval": "string",
460
                "frequency": "1 * * * *",
461
                "action": "create"
462
              }
463
            }
464
        """
465 1
        log.debug("create_schedule /v2/evc/schedule/")
466
467 1
        json_data = self._json_from_request("create_schedule")
468 1
        try:
469 1
            circuit_id = json_data["circuit_id"]
470 1
        except TypeError:
471 1
            result = "The payload should have a dictionary."
472 1
            log.debug("create_schedule result %s %s", result, 400)
473 1
            raise BadRequest(result) from BadRequest
474 1
        except KeyError:
475 1
            result = "Missing circuit_id."
476 1
            log.debug("create_schedule result %s %s", result, 400)
477 1
            raise BadRequest(result) from BadRequest
478
479 1
        try:
480 1
            schedule_data = json_data["schedule"]
481 1
        except KeyError:
482 1
            result = "Missing schedule data."
483 1
            log.debug("create_schedule result %s %s", result, 400)
484 1
            raise BadRequest(result) from BadRequest
485
486
        # Get EVC from circuits buffer
487 1
        circuits = self._get_circuits_buffer()
488
489
        # get the circuit
490 1
        evc = circuits.get(circuit_id)
491
492
        # get the circuit
493 1
        if not evc:
494 1
            result = f"circuit_id {circuit_id} not found"
495 1
            log.debug("create_schedule result %s %s", result, 404)
496 1
            raise NotFound(result) from NotFound
497
        # Can not modify circuits deleted and archived
498 1
        if evc.archived:
499 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
500 1
            log.debug("create_schedule result %s %s", result, 403)
501 1
            raise Forbidden(result) from Forbidden
502
503
        # new schedule from dict
504 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
505
506
        # If there is no schedule, create the list
507 1
        if not evc.circuit_scheduler:
508 1
            evc.circuit_scheduler = []
509
510
        # Add the new schedule
511 1
        evc.circuit_scheduler.append(new_schedule)
512
513
        # Add schedule job
514 1
        self.sched.add_circuit_job(evc, new_schedule)
515
516
        # save circuit to mongodb
517 1
        evc.sync()
518
519 1
        result = new_schedule.as_dict()
520 1
        status = 201
521
522 1
        log.debug("create_schedule result %s %s", result, status)
523 1
        return jsonify(result), status
524
525 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
526 1
    def update_schedule(self, schedule_id):
527
        """Update a schedule.
528
529
        Change all attributes from the given schedule from a EVC circuit.
530
        The schedule ID is preserved as default.
531
        Payload example:
532
            {
533
              "date": "2019-08-07T14:52:10.967Z",
534
              "interval": "string",
535
              "frequency": "1 * * *",
536
              "action": "create"
537
            }
538
        """
539 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
540
541
        # Try to find a circuit schedule
542 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
543
544
        # Can not modify circuits deleted and archived
545 1
        if not found_schedule:
546 1
            result = f"schedule_id {schedule_id} not found"
547 1
            log.debug("update_schedule result %s %s", result, 404)
548 1
            raise NotFound(result) from NotFound
549 1
        if evc.archived:
550 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
551 1
            log.debug("update_schedule result %s %s", result, 403)
552 1
            raise Forbidden(result) from Forbidden
553
554 1
        data = self._json_from_request("update_schedule")
555
556 1
        new_schedule = CircuitSchedule.from_dict(data)
557 1
        new_schedule.id = found_schedule.id
558
        # Remove the old schedule
559 1
        evc.circuit_scheduler.remove(found_schedule)
560
        # Append the modified schedule
561 1
        evc.circuit_scheduler.append(new_schedule)
562
563
        # Cancel all schedule jobs
564 1
        self.sched.cancel_job(found_schedule.id)
565
        # Add the new circuit schedule
566 1
        self.sched.add_circuit_job(evc, new_schedule)
567
        # Save EVC to mongodb
568 1
        evc.sync()
569
570 1
        result = new_schedule.as_dict()
571 1
        status = 200
572
573 1
        log.debug("update_schedule result %s %s", result, status)
574 1
        return jsonify(result), status
575
576 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
577 1
    def delete_schedule(self, schedule_id):
578
        """Remove a circuit schedule.
579
580
        Remove the Schedule from EVC.
581
        Remove the Schedule from cron job.
582
        Save the EVC to the Storehouse.
583
        """
584 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
585 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
586
587
        # Can not modify circuits deleted and archived
588 1
        if not found_schedule:
589 1
            result = f"schedule_id {schedule_id} not found"
590 1
            log.debug("delete_schedule result %s %s", result, 404)
591 1
            raise NotFound(result)
592
593 1
        if evc.archived:
594 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
595 1
            log.debug("delete_schedule result %s %s", result, 403)
596 1
            raise Forbidden(result)
597
598
        # Remove the old schedule
599 1
        evc.circuit_scheduler.remove(found_schedule)
600
601
        # Cancel all schedule jobs
602 1
        self.sched.cancel_job(found_schedule.id)
603
        # Save EVC to mongodb
604 1
        evc.sync()
605
606 1
        result = "Schedule removed"
607 1
        status = 200
608
609 1
        log.debug("delete_schedule result %s %s", result, status)
610 1
        return jsonify(result), status
611
612 1
    def _is_duplicated_evc(self, evc):
613
        """Verify if the circuit given is duplicated with the stored evcs.
614
615
        Args:
616
            evc (EVC): circuit to be analysed.
617
618
        Returns:
619
            boolean: True if the circuit is duplicated, otherwise False.
620
621
        """
622 1
        for circuit in tuple(self.circuits.values()):
623 1
            if not circuit.archived and circuit.shares_uni(evc):
624 1
                return True
625 1
        return False
626
627 1
    @listen_to("kytos/topology.link_up")
628 1
    def on_link_up(self, event):
629
        """Change circuit when link is up or end_maintenance."""
630
        self.handle_link_up(event)
631
632 1
    def handle_link_up(self, event):
633
        """Change circuit when link is up or end_maintenance."""
634 1
        log.debug("Event handle_link_up %s", event)
635 1
        for evc in self.get_evcs_by_svc_level():
636 1
            if evc.is_enabled() and not evc.archived:
637 1
                with evc.lock:
638 1
                    evc.handle_link_up(event.content["link"])
639
640 1
    @listen_to("kytos/topology.link_down")
641 1
    def on_link_down(self, event):
642
        """Change circuit when link is down or under_mantenance."""
643
        self.handle_link_down(event)
644
645 1
    def handle_link_down(self, event):
646
        """Change circuit when link is down or under_mantenance."""
647 1
        link = event.content["link"]
648 1
        log.info("Event handle_link_down %s", link)
649 1
        switch_flows = {}
650 1
        evcs_with_failover = []
651 1
        evcs_normal = []
652 1
        check_failover = []
653 1
        for evc in self.get_evcs_by_svc_level():
654 1
            if evc.is_affected_by_link(link):
655
                # if there is no failover path, handles link down the
656
                # tradditional way
657 1
                if (
658
                    not getattr(evc, 'failover_path', None) or
659
                    evc.is_failover_path_affected_by_link(link)
660
                ):
661 1
                    evcs_normal.append(evc)
662 1
                    continue
663 1
                for dpid, flows in evc.get_failover_flows().items():
664 1
                    switch_flows.setdefault(dpid, [])
665 1
                    switch_flows[dpid].extend(flows)
666 1
                evcs_with_failover.append(evc)
667
            else:
668 1
                check_failover.append(evc)
669
670 1
        offset = 0
671 1
        while switch_flows:
672 1
            offset = (offset + settings.BATCH_SIZE) or None
673 1
            switches = list(switch_flows.keys())
674 1
            for dpid in switches:
675 1
                emit_event(
676
                    self.controller,
677
                    context="kytos.flow_manager",
678
                    name="flows.install",
679
                    dpid=dpid,
680
                    flow_dict={"flows": switch_flows[dpid][:offset]},
681
                )
682 1
                if offset is None or offset >= len(switch_flows[dpid]):
683 1
                    del switch_flows[dpid]
684 1
                    continue
685 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
686 1
            time.sleep(settings.BATCH_INTERVAL)
687
688 1
        for evc in evcs_with_failover:
689 1
            with evc.lock:
690 1
                old_path = evc.current_path
691 1
                evc.current_path = evc.failover_path
692 1
                evc.failover_path = old_path
693 1
                evc.sync()
694 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
695 1
            log.info(
696
                f"{evc} redeployed with failover due to link down {link.id}"
697
            )
698
699 1
        for evc in evcs_normal:
700 1
            emit_event(
701
                self.controller,
702
                "evc_affected_by_link_down",
703
                evc_id=evc.id,
704
                link_id=link.id,
705
            )
706
707
        # After handling the hot path, check if new failover paths are needed.
708
        # Note that EVCs affected by link down will generate a KytosEvent for
709
        # deployed|redeployed, which will trigger the failover path setup.
710
        # Thus, we just need to further check the check_failover list
711 1
        for evc in check_failover:
712 1
            if evc.is_failover_path_affected_by_link(link):
713 1
                evc.setup_failover_path()
714
715 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
716 1
    def on_evc_affected_by_link_down(self, event):
717
        """Change circuit when link is down or under_mantenance."""
718
        self.handle_evc_affected_by_link_down(event)
719
720 1
    def handle_evc_affected_by_link_down(self, event):
721
        """Change circuit when link is down or under_mantenance."""
722 1
        evc = self.circuits.get(event.content["evc_id"])
723 1
        link_id = event.content['link_id']
724 1
        if not evc:
725 1
            return
726 1
        with evc.lock:
727 1
            result = evc.handle_link_down()
728 1
        event_name = "error_redeploy_link_down"
729 1
        if result:
730 1
            log.info(f"{evc} redeployed due to link down {link_id}")
731 1
            event_name = "redeployed_link_down"
732 1
        emit_event(self.controller, event_name, evc_id=evc.id)
733
734 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
735 1
    def on_evc_deployed(self, event):
736
        """Handle EVC deployed|redeployed_link_down."""
737
        self.handle_evc_deployed(event)
738
739 1
    def handle_evc_deployed(self, event):
740
        """Setup failover path on evc deployed."""
741 1
        evc = self.circuits.get(event.content["evc_id"])
742 1
        if not evc:
743 1
            return
744 1
        with evc.lock:
745 1
            evc.setup_failover_path()
746
747 1
    @listen_to("kytos/topology.topology_loaded")
748 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
749
        """Load EVCs once the topology is available."""
750
        self.load_all_evcs()
751
752 1
    def load_all_evcs(self):
753
        """Try to load all EVCs on startup."""
754 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
755 1
        for circuit_id, circuit in circuits:
756 1
            if circuit_id not in self.circuits:
757 1
                self._load_evc(circuit)
758
759 1
    def _load_evc(self, circuit_dict):
760
        """Load one EVC from mongodb to memory."""
761 1
        try:
762 1
            evc = self._evc_from_dict(circuit_dict)
763 1
        except ValueError as exception:
764 1
            log.error(
765
                f"Could not load EVC: dict={circuit_dict} error={exception}"
766
            )
767 1
            return None
768
769 1
        if evc.archived:
770 1
            return None
771 1
        evc.deactivate()
772 1
        evc.sync()
773 1
        self.circuits.setdefault(evc.id, evc)
774 1
        self.sched.add(evc)
775 1
        return evc
776
777 1
    @listen_to("kytos/flow_manager.flow.error")
778 1
    def on_flow_mod_error(self, event):
779
        """Handle flow mod errors related to an EVC."""
780
        self.handle_flow_mod_error(event)
781
782 1
    def handle_flow_mod_error(self, event):
783
        """Handle flow mod errors related to an EVC."""
784 1
        flow = event.content["flow"]
785 1
        command = event.content.get("error_command")
786 1
        if command != "add":
787
            return
788 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
789 1
        if evc:
790 1
            evc.remove_current_flows()
791
792 1
    def _evc_dict_with_instances(self, evc_dict):
793
        """Convert some dict values to instance of EVC classes.
794
795
        This method will convert: [UNI, Link]
796
        """
797 1
        data = evc_dict.copy()  # Do not modify the original dict
798 1
        for attribute, value in data.items():
799
            # Get multiple attributes.
800
            # Ex: uni_a, uni_z
801 1
            if "uni" in attribute:
802 1
                try:
803 1
                    data[attribute] = self._uni_from_dict(value)
804 1
                except ValueError as exception:
805 1
                    result = "Error creating UNI: Invalid value"
806 1
                    raise ValueError(result) from exception
807
808 1
            if attribute == "circuit_scheduler":
809 1
                data[attribute] = []
810 1
                for schedule in value:
811 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
812
813
            # Get multiple attributes.
814
            # Ex: primary_links,
815
            #     backup_links,
816
            #     current_links_cache,
817
            #     primary_links_cache,
818
            #     backup_links_cache
819 1
            if "links" in attribute:
820 1
                data[attribute] = [
821
                    self._link_from_dict(link) for link in value
822
                ]
823
824
            # Ex: current_path,
825
            #     primary_path,
826
            #     backup_path
827 1
            if "path" in attribute and attribute != "dynamic_backup_path":
828 1
                data[attribute] = Path(
829
                    [self._link_from_dict(link) for link in value]
830
                )
831
832 1
        return data
833
834 1
    def _evc_from_dict(self, evc_dict):
835 1
        data = self._evc_dict_with_instances(evc_dict)
836 1
        return EVC(self.controller, **data)
837
838 1
    def _uni_from_dict(self, uni_dict):
839
        """Return a UNI object from python dict."""
840 1
        if uni_dict is None:
841 1
            return False
842
843 1
        interface_id = uni_dict.get("interface_id")
844 1
        interface = self.controller.get_interface_by_id(interface_id)
845 1
        if interface is None:
846 1
            result = (
847
                "Error creating UNI:"
848
                + f"Could not instantiate interface {interface_id}"
849
            )
850 1
            raise ValueError(result) from ValueError
851
852 1
        tag_dict = uni_dict.get("tag", None)
853 1
        if tag_dict:
854 1
            tag = TAG.from_dict(tag_dict)
855
        else:
856 1
            tag = None
857 1
        uni = UNI(interface, tag)
858
859 1
        return uni
860
861 1
    def _link_from_dict(self, link_dict):
862
        """Return a Link object from python dict."""
863 1
        id_a = link_dict.get("endpoint_a").get("id")
864 1
        id_b = link_dict.get("endpoint_b").get("id")
865
866 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
867 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
868
        if not endpoint_a:
869 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
870 1
            raise ValueError(error_msg)
871
        if not endpoint_b:
872
            error_msg = f"Could not get interface endpoint_b id {id_b}"
873 1
            raise ValueError(error_msg)
874 1
875
        link = Link(endpoint_a, endpoint_b)
876
        if "metadata" in link_dict:
877
            link.extend_metadata(link_dict.get("metadata"))
878
879
        s_vlan = link.get_metadata("s_vlan")
880 1
        if s_vlan:
881
            tag = TAG.from_dict(s_vlan)
882 1
            if tag is False:
883
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
884
                raise ValueError(error_msg)
885
            link.update_metadata("s_vlan", tag)
886
        return link
887
888
    def _find_evc_by_schedule_id(self, schedule_id):
889 1
        """
890 1
        Find an EVC and CircuitSchedule based on schedule_id.
891 1
892
        :param schedule_id: Schedule ID
893
        :return: EVC and Schedule
894 1
        """
895 1
        circuits = self._get_circuits_buffer()
896 1
        found_schedule = None
897 1
        evc = None
898 1
899 1
        # pylint: disable=unused-variable
900 1
        for c_id, circuit in circuits.items():
901 1
            for schedule in circuit.circuit_scheduler:
902 1
                if schedule.id == schedule_id:
903
                    found_schedule = schedule
904 1
                    evc = circuit
905
                    break
906
            if found_schedule:
907
                break
908
        return evc, found_schedule
909
910 1
    def _get_circuits_buffer(self):
911
        """
912 1
        Return the circuit buffer.
913 1
914 1
        If the buffer is empty, try to load data from mongodb.
915 1
        """
916 1
        if not self.circuits:
917
            # Load circuits from mongodb to buffer
918 1
            circuits = self.mongo_controller.get_circuits()['circuits']
919 1
            for c_id, circuit in circuits.items():
920
                evc = self._evc_from_dict(circuit)
921
                self.circuits[c_id] = evc
922
        return self.circuits
923
924
    @staticmethod
925
    def _json_from_request(caller):
926 1
        """Return a json from request.
927 1
928
        If it was not possible to get a json from the request, log, for debug,
929
        who was the caller and the error that ocurred, and raise an
930
        Exception.
931
        """
932
        try:
933
            json_data = request.get_json()
934
        except ValueError as exception:
935
            log.error(exception)
936 1
            log.debug(f"{caller} result {exception} 400")
937 1
            raise BadRequest(str(exception)) from BadRequest
938 1
        except BadRequest:
939 1
            result = "The request is not a valid JSON."
940 1
            log.debug(f"{caller} result {result} 400")
941
            raise BadRequest(result) from BadRequest
942
        if json_data is None:
943
            result = "Content-Type must be application/json"
944
            log.debug(f"{caller} result {result} 415")
945
            raise UnsupportedMediaType(result)
946
        return json_data
947