Passed
Pull Request — master (#200)
by Vinicius
05:15 queued 01:58
created

build.main.Main.get_circuit()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 12
ccs 11
cts 11
cp 1
rs 9.85
c 0
b 0
f 0
cc 2
nop 2
crap 2
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
            ):
99 1
                if circuit.check_traces():
100 1
                    log.info(f"{circuit} enabled but inactive - activating")
101 1
                    with circuit.lock:
102 1
                        circuit.activate()
103 1
                        circuit.sync()
104
                else:
105 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
106 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
107 1
                        with circuit.lock:
108 1
                            circuit.deploy()
109 1
        for circuit_id in stored_circuits:
110 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
111 1
            self._load_evc(stored_circuits[circuit_id])
112
113 1
    def shutdown(self):
114
        """Execute when your napp is unloaded.
115
116
        If you have some cleanup procedure, insert it here.
117
        """
118
119 1
    @rest("/v2/evc/", methods=["GET"])
120 1
    def list_circuits(self):
121
        """Endpoint to return circuits stored.
122
123
        archive query arg if defined (not null) will be filtered
124
        accordingly, by default only non archived evcs will be listed
125
        """
126 1
        log.debug("list_circuits /v2/evc")
127 1
        archived = request.args.get("archived", "false").lower()
128 1
        archived_to_optional = {"null": None, "true": True, "false": False}
129 1
        archived = archived_to_optional.get(archived, False)
130 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
131 1
        circuits = circuits['circuits']
132 1
        return jsonify(circuits), 200
133
134 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
135 1
    def get_circuit(self, circuit_id):
136
        """Endpoint to return a circuit based on id."""
137 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
138 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
139 1
        if not circuit:
140 1
            result = f"circuit_id {circuit_id} not found"
141 1
            log.debug("get_circuit result %s %s", result, 404)
142 1
            raise NotFound(result)
143 1
        status = 200
144 1
        log.debug("get_circuit result %s %s", circuit, status)
145 1
        return jsonify(circuit), status
146
147 1
    @rest("/v2/evc/", methods=["POST"])
148 1
    @validate(spec)
149 1
    def create_circuit(self, data):
150
        """Try to create a new circuit.
151
152
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
153
        UNI_Z's requested C-VID are available from the interfaces' pools. This
154
        is checked when creating the UNI object.
155
156
        Then, E-Line NApp requests a primary and a backup path to the
157
        Pathfinder NApp using the attributes primary_links and backup_links
158
        submitted via REST
159
160
        # For each link composing paths in #3:
161
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
162
        #  - Using the S-VID obtained, generate abstract flow entries to be
163
        #    sent to FlowManager
164
165
        Push abstract flow entries to FlowManager and FlowManager pushes
166
        OpenFlow entries to datapaths
167
168
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
169
        creation
170
171
        Finnaly, notify user of the status of its request.
172
        """
173
        # Try to create the circuit object
174 1
        log.debug("create_circuit /v2/evc/")
175
176 1
        try:
177 1
            evc = self._evc_from_dict(data)
178 1
        except ValueError as exception:
179 1
            log.debug("create_circuit result %s %s", exception, 400)
180 1
            raise BadRequest(str(exception)) from BadRequest
181
182 1
        if evc.primary_path:
183
            try:
184
                evc.primary_path.is_valid(
185
                    evc.uni_a.interface.switch,
186
                    evc.uni_z.interface.switch,
187
                    bool(evc.circuit_scheduler),
188
                )
189
            except InvalidPath as exception:
190
                raise BadRequest(
191
                    f"primary_path is not valid: {exception}"
192
                ) from exception
193 1
        if evc.backup_path:
194
            try:
195
                evc.backup_path.is_valid(
196
                    evc.uni_a.interface.switch,
197
                    evc.uni_z.interface.switch,
198
                    bool(evc.circuit_scheduler),
199
                )
200
            except InvalidPath as exception:
201
                raise BadRequest(
202
                    f"backup_path is not valid: {exception}"
203
                ) from exception
204
205
        # verify duplicated evc
206 1
        if self._is_duplicated_evc(evc):
207 1
            result = "The EVC already exists."
208 1
            log.debug("create_circuit result %s %s", result, 409)
209 1
            raise Conflict(result)
210
211 1
        if (
212
            not evc.primary_path
213
            and evc.dynamic_backup_path is False
214
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
215
        ):
216 1
            result = "The EVC must have a primary path or allow dynamic paths."
217 1
            log.debug("create_circuit result %s %s", result, 400)
218 1
            raise BadRequest(result)
219
220
        # store circuit in dictionary
221 1
        self.circuits[evc.id] = evc
222
223
        # save circuit
224 1
        evc.sync()
225
226
        # Schedule the circuit deploy
227 1
        self.sched.add(evc)
228
229
        # Circuit has no schedule, deploy now
230 1
        if not evc.circuit_scheduler:
231 1
            with evc.lock:
232 1
                evc.deploy()
233
234
        # Notify users
235 1
        event = KytosEvent(
236
            name="kytos.mef_eline.created", content=evc.as_dict()
237
        )
238 1
        self.controller.buffers.app.put(event)
239
240 1
        result = {"circuit_id": evc.id}
241 1
        status = 201
242 1
        log.debug("create_circuit result %s %s", result, status)
243 1
        emit_event(self.controller, "created", evc_id=evc.id)
244 1
        return jsonify(result), status
245
246 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
247 1
    def update(self, circuit_id):
248
        """Update a circuit based on payload.
249
250
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
251
        """
252 1
        log.debug("update /v2/evc/%s", circuit_id)
253 1
        try:
254 1
            evc = self.circuits[circuit_id]
255 1
        except KeyError:
256 1
            result = f"circuit_id {circuit_id} not found"
257 1
            log.debug("update result %s %s", result, 404)
258 1
            raise NotFound(result) from NotFound
259
260 1
        if evc.archived:
261 1
            result = "Can't update archived EVC"
262 1
            log.debug("update result %s %s", result, 405)
263 1
            raise MethodNotAllowed(["GET"], result)
264
265 1
        try:
266 1
            data = request.get_json()
267 1
        except BadRequest:
268 1
            result = "The request body is not a well-formed JSON."
269 1
            log.debug("update result %s %s", result, 400)
270 1
            raise BadRequest(result) from BadRequest
271 1
        if data is None:
272 1
            result = "The request body mimetype is not application/json."
273 1
            log.debug("update result %s %s", result, 415)
274 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
275
276 1
        try:
277 1
            enable, redeploy = evc.update(
278
                **self._evc_dict_with_instances(data)
279
            )
280 1
        except ValueError as exception:
281 1
            log.error(exception)
282 1
            log.debug("update result %s %s", exception, 400)
283 1
            raise BadRequest(str(exception)) from BadRequest
284
285 1
        if evc.is_active():
286
            if enable is False:  # disable if active
287
                with evc.lock:
288
                    evc.remove()
289
            elif redeploy is not None:  # redeploy if active
290
                with evc.lock:
291
                    evc.remove()
292
                    evc.deploy()
293
        else:
294 1
            if enable is True:  # enable if inactive
295 1
                with evc.lock:
296 1
                    evc.deploy()
297 1
        result = {evc.id: evc.as_dict()}
298 1
        status = 200
299
300 1
        log.debug("update result %s %s", result, status)
301 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
302 1
        return jsonify(result), status
303
304 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
305 1
    def delete_circuit(self, circuit_id):
306
        """Remove a circuit.
307
308
        First, the flows are removed from the switches, and then the EVC is
309
        disabled.
310
        """
311 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
312 1
        try:
313 1
            evc = self.circuits[circuit_id]
314 1
        except KeyError:
315 1
            result = f"circuit_id {circuit_id} not found"
316 1
            log.debug("delete_circuit result %s %s", result, 404)
317 1
            raise NotFound(result) from NotFound
318
319 1
        if evc.archived:
320 1
            result = f"Circuit {circuit_id} already removed"
321 1
            log.debug("delete_circuit result %s %s", result, 404)
322 1
            raise NotFound(result) from NotFound
323
324 1
        log.info("Removing %s", evc)
325 1
        with evc.lock:
326 1
            evc.remove_current_flows()
327 1
            evc.deactivate()
328 1
            evc.disable()
329 1
            self.sched.remove(evc)
330 1
            evc.archive()
331 1
            evc.sync()
332 1
        log.info("EVC removed. %s", evc)
333 1
        result = {"response": f"Circuit {circuit_id} removed"}
334 1
        status = 200
335
336 1
        log.debug("delete_circuit result %s %s", result, status)
337 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
338 1
        return jsonify(result), status
339
340 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
341 1
    def get_metadata(self, circuit_id):
342
        """Get metadata from an EVC."""
343 1
        try:
344 1
            return (
345
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
346
                200,
347
            )
348
        except KeyError as error:
349
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
350
351 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
352 1
    def add_metadata(self, circuit_id):
353
        """Add metadata to an EVC."""
354 1
        try:
355 1
            metadata = request.get_json()
356 1
            content_type = request.content_type
357 1
        except BadRequest as error:
358 1
            result = "The request body is not a well-formed JSON."
359 1
            raise BadRequest(result) from error
360 1
        if content_type is None:
361 1
            result = "The request body is empty."
362 1
            raise BadRequest(result)
363 1
        if metadata is None:
364 1
            if content_type != "application/json":
365 1
                result = (
366
                    "The content type must be application/json "
367
                    f"(received {content_type})."
368
                )
369
            else:
370
                result = "Metadata is empty."
371 1
            raise UnsupportedMediaType(result)
372
373 1
        try:
374 1
            evc = self.circuits[circuit_id]
375 1
        except KeyError as error:
376 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
377
378 1
        evc.extend_metadata(metadata)
379 1
        evc.sync()
380 1
        return jsonify("Operation successful"), 201
381
382 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
383 1
    def delete_metadata(self, circuit_id, key):
384
        """Delete metadata from an EVC."""
385 1
        try:
386 1
            evc = self.circuits[circuit_id]
387 1
        except KeyError as error:
388 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
389
390 1
        evc.remove_metadata(key)
391 1
        evc.sync()
392 1
        return jsonify("Operation successful"), 200
393
394 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
395 1
    def redeploy(self, circuit_id):
396
        """Endpoint to force the redeployment of an EVC."""
397 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
398 1
        try:
399 1
            evc = self.circuits[circuit_id]
400 1
        except KeyError:
401 1
            result = f"circuit_id {circuit_id} not found"
402 1
            raise NotFound(result) from NotFound
403 1
        if evc.is_enabled():
404 1
            with evc.lock:
405 1
                evc.remove_current_flows()
406 1
                evc.deploy()
407 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
408 1
            status = 202
409
        else:
410 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
411 1
            status = 409
412
413 1
        return jsonify(result), status
414
415 1
    @rest("/v2/evc/schedule", methods=["GET"])
416 1
    def list_schedules(self):
417
        """Endpoint to return all schedules stored for all circuits.
418
419
        Return a JSON with the following template:
420
        [{"schedule_id": <schedule_id>,
421
         "circuit_id": <circuit_id>,
422
         "schedule": <schedule object>}]
423
        """
424 1
        log.debug("list_schedules /v2/evc/schedule")
425 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
426 1
        if not circuits:
427 1
            result = {}
428 1
            status = 200
429 1
            return jsonify(result), status
430
431 1
        result = []
432 1
        status = 200
433 1
        for circuit in circuits:
434 1
            circuit_scheduler = circuit.get("circuit_scheduler")
435 1
            if circuit_scheduler:
436 1
                for scheduler in circuit_scheduler:
437 1
                    value = {
438
                        "schedule_id": scheduler.get("id"),
439
                        "circuit_id": circuit.get("id"),
440
                        "schedule": scheduler,
441
                    }
442 1
                    result.append(value)
443
444 1
        log.debug("list_schedules result %s %s", result, status)
445 1
        return jsonify(result), status
446
447 1
    @rest("/v2/evc/schedule/", methods=["POST"])
448 1
    def create_schedule(self):
449
        """
450
        Create a new schedule for a given circuit.
451
452
        This service do no check if there are conflicts with another schedule.
453
        Payload example:
454
            {
455
              "circuit_id":"aa:bb:cc",
456
              "schedule": {
457
                "date": "2019-08-07T14:52:10.967Z",
458
                "interval": "string",
459
                "frequency": "1 * * * *",
460
                "action": "create"
461
              }
462
            }
463
        """
464 1
        log.debug("create_schedule /v2/evc/schedule/")
465
466 1
        json_data = self._json_from_request("create_schedule")
467 1
        try:
468 1
            circuit_id = json_data["circuit_id"]
469 1
        except TypeError:
470 1
            result = "The payload should have a dictionary."
471 1
            log.debug("create_schedule result %s %s", result, 400)
472 1
            raise BadRequest(result) from BadRequest
473 1
        except KeyError:
474 1
            result = "Missing circuit_id."
475 1
            log.debug("create_schedule result %s %s", result, 400)
476 1
            raise BadRequest(result) from BadRequest
477
478 1
        try:
479 1
            schedule_data = json_data["schedule"]
480 1
        except KeyError:
481 1
            result = "Missing schedule data."
482 1
            log.debug("create_schedule result %s %s", result, 400)
483 1
            raise BadRequest(result) from BadRequest
484
485
        # Get EVC from circuits buffer
486 1
        circuits = self._get_circuits_buffer()
487
488
        # get the circuit
489 1
        evc = circuits.get(circuit_id)
490
491
        # get the circuit
492 1
        if not evc:
493 1
            result = f"circuit_id {circuit_id} not found"
494 1
            log.debug("create_schedule result %s %s", result, 404)
495 1
            raise NotFound(result) from NotFound
496
        # Can not modify circuits deleted and archived
497 1
        if evc.archived:
498 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
499 1
            log.debug("create_schedule result %s %s", result, 403)
500 1
            raise Forbidden(result) from Forbidden
501
502
        # new schedule from dict
503 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
504
505
        # If there is no schedule, create the list
506 1
        if not evc.circuit_scheduler:
507 1
            evc.circuit_scheduler = []
508
509
        # Add the new schedule
510 1
        evc.circuit_scheduler.append(new_schedule)
511
512
        # Add schedule job
513 1
        self.sched.add_circuit_job(evc, new_schedule)
514
515
        # save circuit to mongodb
516 1
        evc.sync()
517
518 1
        result = new_schedule.as_dict()
519 1
        status = 201
520
521 1
        log.debug("create_schedule result %s %s", result, status)
522 1
        return jsonify(result), status
523
524 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
525 1
    def update_schedule(self, schedule_id):
526
        """Update a schedule.
527
528
        Change all attributes from the given schedule from a EVC circuit.
529
        The schedule ID is preserved as default.
530
        Payload example:
531
            {
532
              "date": "2019-08-07T14:52:10.967Z",
533
              "interval": "string",
534
              "frequency": "1 * * *",
535
              "action": "create"
536
            }
537
        """
538 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
539
540
        # Try to find a circuit schedule
541 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
542
543
        # Can not modify circuits deleted and archived
544 1
        if not found_schedule:
545 1
            result = f"schedule_id {schedule_id} not found"
546 1
            log.debug("update_schedule result %s %s", result, 404)
547 1
            raise NotFound(result) from NotFound
548 1
        if evc.archived:
549 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
550 1
            log.debug("update_schedule result %s %s", result, 403)
551 1
            raise Forbidden(result) from Forbidden
552
553 1
        data = self._json_from_request("update_schedule")
554
555 1
        new_schedule = CircuitSchedule.from_dict(data)
556 1
        new_schedule.id = found_schedule.id
557
        # Remove the old schedule
558 1
        evc.circuit_scheduler.remove(found_schedule)
559
        # Append the modified schedule
560 1
        evc.circuit_scheduler.append(new_schedule)
561
562
        # Cancel all schedule jobs
563 1
        self.sched.cancel_job(found_schedule.id)
564
        # Add the new circuit schedule
565 1
        self.sched.add_circuit_job(evc, new_schedule)
566
        # Save EVC to mongodb
567 1
        evc.sync()
568
569 1
        result = new_schedule.as_dict()
570 1
        status = 200
571
572 1
        log.debug("update_schedule result %s %s", result, status)
573 1
        return jsonify(result), status
574
575 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
576 1
    def delete_schedule(self, schedule_id):
577
        """Remove a circuit schedule.
578
579
        Remove the Schedule from EVC.
580
        Remove the Schedule from cron job.
581
        Save the EVC to the Storehouse.
582
        """
583 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
584 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
585
586
        # Can not modify circuits deleted and archived
587 1
        if not found_schedule:
588 1
            result = f"schedule_id {schedule_id} not found"
589 1
            log.debug("delete_schedule result %s %s", result, 404)
590 1
            raise NotFound(result)
591
592 1
        if evc.archived:
593 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
594 1
            log.debug("delete_schedule result %s %s", result, 403)
595 1
            raise Forbidden(result)
596
597
        # Remove the old schedule
598 1
        evc.circuit_scheduler.remove(found_schedule)
599
600
        # Cancel all schedule jobs
601 1
        self.sched.cancel_job(found_schedule.id)
602
        # Save EVC to mongodb
603 1
        evc.sync()
604
605 1
        result = "Schedule removed"
606 1
        status = 200
607
608 1
        log.debug("delete_schedule result %s %s", result, status)
609 1
        return jsonify(result), status
610
611 1
    def _is_duplicated_evc(self, evc):
612
        """Verify if the circuit given is duplicated with the stored evcs.
613
614
        Args:
615
            evc (EVC): circuit to be analysed.
616
617
        Returns:
618
            boolean: True if the circuit is duplicated, otherwise False.
619
620
        """
621 1
        for circuit in tuple(self.circuits.values()):
622 1
            if not circuit.archived and circuit.shares_uni(evc):
623 1
                return True
624 1
        return False
625
626 1
    @listen_to("kytos/topology.link_up")
627 1
    def on_link_up(self, event):
628
        """Change circuit when link is up or end_maintenance."""
629
        self.handle_link_up(event)
630
631 1
    def handle_link_up(self, event):
632
        """Change circuit when link is up or end_maintenance."""
633 1
        log.debug("Event handle_link_up %s", event)
634 1
        for evc in self.get_evcs_by_svc_level():
635 1
            if evc.is_enabled() and not evc.archived:
636 1
                with evc.lock:
637 1
                    evc.handle_link_up(event.content["link"])
638
639 1
    @listen_to("kytos/topology.link_down")
640 1
    def on_link_down(self, event):
641
        """Change circuit when link is down or under_mantenance."""
642
        self.handle_link_down(event)
643
644 1
    def handle_link_down(self, event):
645
        """Change circuit when link is down or under_mantenance."""
646 1
        link = event.content["link"]
647 1
        log.info("Event handle_link_down %s", link)
648 1
        switch_flows = {}
649 1
        evcs_with_failover = []
650 1
        evcs_normal = []
651 1
        check_failover = []
652 1
        for evc in self.get_evcs_by_svc_level():
653 1
            if evc.is_affected_by_link(link):
654
                # if there is no failover path, handles link down the
655
                # tradditional way
656 1
                if (
657
                    not getattr(evc, 'failover_path', None) or
658
                    evc.is_failover_path_affected_by_link(link)
659
                ):
660 1
                    evcs_normal.append(evc)
661 1
                    continue
662 1
                for dpid, flows in evc.get_failover_flows().items():
663 1
                    switch_flows.setdefault(dpid, [])
664 1
                    switch_flows[dpid].extend(flows)
665 1
                evcs_with_failover.append(evc)
666
            else:
667 1
                check_failover.append(evc)
668
669 1
        offset = 0
670 1
        while switch_flows:
671 1
            offset = (offset + settings.BATCH_SIZE) or None
672 1
            switches = list(switch_flows.keys())
673 1
            for dpid in switches:
674 1
                emit_event(
675
                    self.controller,
676
                    context="kytos.flow_manager",
677
                    name="flows.install",
678
                    dpid=dpid,
679
                    flow_dict={"flows": switch_flows[dpid][:offset]},
680
                )
681 1
                if offset is None or offset >= len(switch_flows[dpid]):
682 1
                    del switch_flows[dpid]
683 1
                    continue
684 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
685 1
            time.sleep(settings.BATCH_INTERVAL)
686
687 1
        for evc in evcs_with_failover:
688 1
            with evc.lock:
689 1
                old_path = evc.current_path
690 1
                evc.current_path = evc.failover_path
691 1
                evc.failover_path = old_path
692 1
                evc.sync()
693 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
694 1
            log.info(
695
                f"{evc} redeployed with failover due to link down {link.id}"
696
            )
697
698 1
        for evc in evcs_normal:
699 1
            emit_event(
700
                self.controller,
701
                "evc_affected_by_link_down",
702
                evc_id=evc.id,
703
                link_id=link.id,
704
            )
705
706
        # After handling the hot path, check if new failover paths are needed.
707
        # Note that EVCs affected by link down will generate a KytosEvent for
708
        # deployed|redeployed, which will trigger the failover path setup.
709
        # Thus, we just need to further check the check_failover list
710 1
        for evc in check_failover:
711 1
            if evc.is_failover_path_affected_by_link(link):
712 1
                evc.setup_failover_path()
713
714 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
715 1
    def on_evc_affected_by_link_down(self, event):
716
        """Change circuit when link is down or under_mantenance."""
717
        self.handle_evc_affected_by_link_down(event)
718
719 1
    def handle_evc_affected_by_link_down(self, event):
720
        """Change circuit when link is down or under_mantenance."""
721 1
        evc = self.circuits.get(event.content["evc_id"])
722 1
        link_id = event.content['link_id']
723 1
        if not evc:
724 1
            return
725 1
        with evc.lock:
726 1
            result = evc.handle_link_down()
727 1
        event_name = "error_redeploy_link_down"
728 1
        if result:
729 1
            log.info(f"{evc} redeployed due to link down {link_id}")
730 1
            event_name = "redeployed_link_down"
731 1
        emit_event(self.controller, event_name, evc_id=evc.id)
732
733 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
734 1
    def on_evc_deployed(self, event):
735
        """Handle EVC deployed|redeployed_link_down."""
736
        self.handle_evc_deployed(event)
737
738 1
    def handle_evc_deployed(self, event):
739
        """Setup failover path on evc deployed."""
740 1
        evc = self.circuits.get(event.content["evc_id"])
741 1
        if not evc:
742 1
            return
743 1
        with evc.lock:
744 1
            evc.setup_failover_path()
745
746 1
    @listen_to("kytos/topology.topology_loaded")
747 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
748
        """Load EVCs once the topology is available."""
749
        self.load_all_evcs()
750
751 1
    def load_all_evcs(self):
752
        """Try to load all EVCs on startup."""
753 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
754 1
        for circuit_id, circuit in circuits:
755 1
            if circuit_id not in self.circuits:
756 1
                self._load_evc(circuit)
757
758 1
    def _load_evc(self, circuit_dict):
759
        """Load one EVC from mongodb to memory."""
760 1
        try:
761 1
            evc = self._evc_from_dict(circuit_dict)
762 1
        except ValueError as exception:
763 1
            log.error(
764
                f"Could not load EVC: dict={circuit_dict} error={exception}"
765
            )
766 1
            return None
767
768 1
        if evc.archived:
769 1
            return None
770 1
        evc.deactivate()
771 1
        evc.sync()
772 1
        self.circuits.setdefault(evc.id, evc)
773 1
        self.sched.add(evc)
774 1
        return evc
775
776 1
    @listen_to("kytos/flow_manager.flow.error")
777 1
    def on_flow_mod_error(self, event):
778
        """Handle flow mod errors related to an EVC."""
779
        self.handle_flow_mod_error(event)
780
781 1
    def handle_flow_mod_error(self, event):
782
        """Handle flow mod errors related to an EVC."""
783 1
        flow = event.content["flow"]
784 1
        command = event.content.get("error_command")
785 1
        if command != "add":
786
            return
787 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
788 1
        if evc:
789 1
            evc.remove_current_flows()
790
791 1
    def _evc_dict_with_instances(self, evc_dict):
792
        """Convert some dict values to instance of EVC classes.
793
794
        This method will convert: [UNI, Link]
795
        """
796 1
        data = evc_dict.copy()  # Do not modify the original dict
797 1
        for attribute, value in data.items():
798
            # Get multiple attributes.
799
            # Ex: uni_a, uni_z
800 1
            if "uni" in attribute:
801 1
                try:
802 1
                    data[attribute] = self._uni_from_dict(value)
803 1
                except ValueError as exception:
804 1
                    result = "Error creating UNI: Invalid value"
805 1
                    raise ValueError(result) from exception
806
807 1
            if attribute == "circuit_scheduler":
808 1
                data[attribute] = []
809 1
                for schedule in value:
810 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
811
812
            # Get multiple attributes.
813
            # Ex: primary_links,
814
            #     backup_links,
815
            #     current_links_cache,
816
            #     primary_links_cache,
817
            #     backup_links_cache
818 1
            if "links" in attribute:
819 1
                data[attribute] = [
820
                    self._link_from_dict(link) for link in value
821
                ]
822
823
            # Ex: current_path,
824
            #     primary_path,
825
            #     backup_path
826 1
            if "path" in attribute and attribute != "dynamic_backup_path":
827 1
                data[attribute] = Path(
828
                    [self._link_from_dict(link) for link in value]
829
                )
830
831 1
        return data
832
833 1
    def _evc_from_dict(self, evc_dict):
834 1
        data = self._evc_dict_with_instances(evc_dict)
835 1
        return EVC(self.controller, **data)
836
837 1
    def _uni_from_dict(self, uni_dict):
838
        """Return a UNI object from python dict."""
839 1
        if uni_dict is None:
840 1
            return False
841
842 1
        interface_id = uni_dict.get("interface_id")
843 1
        interface = self.controller.get_interface_by_id(interface_id)
844 1
        if interface is None:
845 1
            result = (
846
                "Error creating UNI:"
847
                + f"Could not instantiate interface {interface_id}"
848
            )
849 1
            raise ValueError(result) from ValueError
850
851 1
        tag_dict = uni_dict.get("tag", None)
852 1
        if tag_dict:
853 1
            tag = TAG.from_dict(tag_dict)
854
        else:
855 1
            tag = None
856 1
        uni = UNI(interface, tag)
857
858 1
        return uni
859
860 1
    def _link_from_dict(self, link_dict):
861
        """Return a Link object from python dict."""
862 1
        id_a = link_dict.get("endpoint_a").get("id")
863 1
        id_b = link_dict.get("endpoint_b").get("id")
864
865 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
866 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
867
868 1
        link = Link(endpoint_a, endpoint_b)
869 1
        if "metadata" in link_dict:
870
            link.extend_metadata(link_dict.get("metadata"))
871
872 1
        s_vlan = link.get_metadata("s_vlan")
873 1
        if s_vlan:
874
            tag = TAG.from_dict(s_vlan)
875
            if tag is False:
876
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
877
                raise ValueError(error_msg)
878
            link.update_metadata("s_vlan", tag)
879 1
        return link
880
881 1
    def _find_evc_by_schedule_id(self, schedule_id):
882
        """
883
        Find an EVC and CircuitSchedule based on schedule_id.
884
885
        :param schedule_id: Schedule ID
886
        :return: EVC and Schedule
887
        """
888 1
        circuits = self._get_circuits_buffer()
889 1
        found_schedule = None
890 1
        evc = None
891
892
        # pylint: disable=unused-variable
893 1
        for c_id, circuit in circuits.items():
894 1
            for schedule in circuit.circuit_scheduler:
895 1
                if schedule.id == schedule_id:
896 1
                    found_schedule = schedule
897 1
                    evc = circuit
898 1
                    break
899 1
            if found_schedule:
900 1
                break
901 1
        return evc, found_schedule
902
903 1
    def _get_circuits_buffer(self):
904
        """
905
        Return the circuit buffer.
906
907
        If the buffer is empty, try to load data from mongodb.
908
        """
909 1
        if not self.circuits:
910
            # Load circuits from mongodb to buffer
911 1
            circuits = self.mongo_controller.get_circuits()['circuits']
912 1
            for c_id, circuit in circuits.items():
913 1
                evc = self._evc_from_dict(circuit)
914 1
                self.circuits[c_id] = evc
915 1
        return self.circuits
916
917 1
    @staticmethod
918 1
    def _json_from_request(caller):
919
        """Return a json from request.
920
921
        If it was not possible to get a json from the request, log, for debug,
922
        who was the caller and the error that ocurred, and raise an
923
        Exception.
924
        """
925 1
        try:
926 1
            json_data = request.get_json()
927
        except ValueError as exception:
928
            log.error(exception)
929
            log.debug(f"{caller} result {exception} 400")
930
            raise BadRequest(str(exception)) from BadRequest
931
        except BadRequest:
932
            result = "The request is not a valid JSON."
933
            log.debug(f"{caller} result {result} 400")
934
            raise BadRequest(result) from BadRequest
935 1
        if json_data is None:
936 1
            result = "Content-Type must be application/json"
937 1
            log.debug(f"{caller} result {result} 415")
938 1
            raise UnsupportedMediaType(result)
939
        return json_data
940