Passed
Pull Request — master (#233)
by Vinicius
08:40 queued 05:16
created

build.main.Main.handle_evc_affected_by_link_down()   A

Complexity

Conditions 4

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 13
ccs 12
cts 12
cp 1
rs 9.8
c 0
b 0
f 0
cc 4
nop 2
crap 4
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
            ):
99 1
                if circuit.check_traces():
100 1
                    log.info(f"{circuit} enabled but inactive - activating")
101 1
                    with circuit.lock:
102 1
                        circuit.activate()
103 1
                        circuit.sync()
104
                else:
105 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
106 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
107 1
                        with circuit.lock:
108 1
                            circuit.deploy()
109 1
        for circuit_id in stored_circuits:
110 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
111 1
            self._load_evc(stored_circuits[circuit_id])
112
113 1
    def shutdown(self):
114
        """Execute when your napp is unloaded.
115
116
        If you have some cleanup procedure, insert it here.
117
        """
118
119 1
    @rest("/v2/evc/", methods=["GET"])
120 1
    def list_circuits(self):
121
        """Endpoint to return circuits stored.
122
123
        archive query arg if defined (not null) will be filtered
124
        accordingly, by default only non archived evcs will be listed
125
        """
126 1
        log.debug("list_circuits /v2/evc")
127 1
        archived = request.args.get("archived", "false").lower()
128 1
        archived_to_optional = {"null": None, "true": True, "false": False}
129 1
        archived = archived_to_optional.get(archived, False)
130 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
131 1
        circuits = circuits['circuits']
132 1
        return jsonify(circuits), 200
133
134 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
135 1
    def get_circuit(self, circuit_id):
136
        """Endpoint to return a circuit based on id."""
137 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
138 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
139 1
        if not circuit:
140 1
            result = f"circuit_id {circuit_id} not found"
141 1
            log.debug("get_circuit result %s %s", result, 404)
142 1
            raise NotFound(result)
143 1
        status = 200
144 1
        log.debug("get_circuit result %s %s", circuit, status)
145 1
        return jsonify(circuit), status
146
147 1
    @rest("/v2/evc/", methods=["POST"])
148 1
    @validate(spec)
149 1
    def create_circuit(self, data):
150
        """Try to create a new circuit.
151
152
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
153
        UNI_Z's requested C-VID are available from the interfaces' pools. This
154
        is checked when creating the UNI object.
155
156
        Then, E-Line NApp requests a primary and a backup path to the
157
        Pathfinder NApp using the attributes primary_links and backup_links
158
        submitted via REST
159
160
        # For each link composing paths in #3:
161
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
162
        #  - Using the S-VID obtained, generate abstract flow entries to be
163
        #    sent to FlowManager
164
165
        Push abstract flow entries to FlowManager and FlowManager pushes
166
        OpenFlow entries to datapaths
167
168
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
169
        creation
170
171
        Finnaly, notify user of the status of its request.
172
        """
173
        # Try to create the circuit object
174 1
        log.debug("create_circuit /v2/evc/")
175
176 1
        try:
177 1
            evc = self._evc_from_dict(data)
178 1
        except ValueError as exception:
179 1
            log.debug("create_circuit result %s %s", exception, 400)
180 1
            raise BadRequest(str(exception)) from BadRequest
181
182 1
        if evc.primary_path:
183
            try:
184
                evc.primary_path.is_valid(
185
                    evc.uni_a.interface.switch,
186
                    evc.uni_z.interface.switch,
187
                    bool(evc.circuit_scheduler),
188
                )
189
            except InvalidPath as exception:
190
                raise BadRequest(
191
                    f"primary_path is not valid: {exception}"
192
                ) from exception
193 1
        if evc.backup_path:
194
            try:
195
                evc.backup_path.is_valid(
196
                    evc.uni_a.interface.switch,
197
                    evc.uni_z.interface.switch,
198
                    bool(evc.circuit_scheduler),
199
                )
200
            except InvalidPath as exception:
201
                raise BadRequest(
202
                    f"backup_path is not valid: {exception}"
203
                ) from exception
204
205
        # verify duplicated evc
206 1
        if self._is_duplicated_evc(evc):
207 1
            result = "The EVC already exists."
208 1
            log.debug("create_circuit result %s %s", result, 409)
209 1
            raise Conflict(result)
210
211 1
        try:
212 1
            evc._validate_has_primary_or_dynamic()
213 1
        except ValueError as exception:
214 1
            raise BadRequest(str(exception)) from exception
215
216
        # store circuit in dictionary
217 1
        self.circuits[evc.id] = evc
218
219
        # save circuit
220 1
        evc.sync()
221
222
        # Schedule the circuit deploy
223 1
        self.sched.add(evc)
224
225
        # Circuit has no schedule, deploy now
226 1
        if not evc.circuit_scheduler:
227 1
            with evc.lock:
228 1
                evc.deploy()
229
230
        # Notify users
231 1
        event = KytosEvent(
232
            name="kytos.mef_eline.created", content=evc.as_dict()
233
        )
234 1
        self.controller.buffers.app.put(event)
235
236 1
        result = {"circuit_id": evc.id}
237 1
        status = 201
238 1
        log.debug("create_circuit result %s %s", result, status)
239 1
        emit_event(self.controller, "created", evc_id=evc.id)
240 1
        return jsonify(result), status
241
242 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
243 1
    def update(self, circuit_id):
244
        """Update a circuit based on payload.
245
246
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
247
        """
248 1
        log.debug("update /v2/evc/%s", circuit_id)
249 1
        try:
250 1
            evc = self.circuits[circuit_id]
251 1
        except KeyError:
252 1
            result = f"circuit_id {circuit_id} not found"
253 1
            log.debug("update result %s %s", result, 404)
254 1
            raise NotFound(result) from NotFound
255
256 1
        if evc.archived:
257 1
            result = "Can't update archived EVC"
258 1
            log.debug("update result %s %s", result, 405)
259 1
            raise MethodNotAllowed(["GET"], result)
260
261 1
        try:
262 1
            data = request.get_json()
263 1
        except BadRequest:
264 1
            result = "The request body is not a well-formed JSON."
265 1
            log.debug("update result %s %s", result, 400)
266 1
            raise BadRequest(result) from BadRequest
267 1
        if data is None:
268 1
            result = "The request body mimetype is not application/json."
269 1
            log.debug("update result %s %s", result, 415)
270 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
271
272 1
        try:
273 1
            enable, redeploy = evc.update(
274
                **self._evc_dict_with_instances(data)
275
            )
276 1
        except ValueError as exception:
277 1
            log.error(exception)
278 1
            log.debug("update result %s %s", exception, 400)
279 1
            raise BadRequest(str(exception)) from BadRequest
280
281 1
        if evc.is_active():
282
            if enable is False:  # disable if active
283
                with evc.lock:
284
                    evc.remove()
285
            elif redeploy is not None:  # redeploy if active
286
                with evc.lock:
287
                    evc.remove()
288
                    evc.deploy()
289
        else:
290 1
            if enable is True:  # enable if inactive
291 1
                with evc.lock:
292 1
                    evc.deploy()
293 1
        result = {evc.id: evc.as_dict()}
294 1
        status = 200
295
296 1
        log.debug("update result %s %s", result, status)
297 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
298 1
        return jsonify(result), status
299
300 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
301 1
    def delete_circuit(self, circuit_id):
302
        """Remove a circuit.
303
304
        First, the flows are removed from the switches, and then the EVC is
305
        disabled.
306
        """
307 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
308 1
        try:
309 1
            evc = self.circuits[circuit_id]
310 1
        except KeyError:
311 1
            result = f"circuit_id {circuit_id} not found"
312 1
            log.debug("delete_circuit result %s %s", result, 404)
313 1
            raise NotFound(result) from NotFound
314
315 1
        if evc.archived:
316 1
            result = f"Circuit {circuit_id} already removed"
317 1
            log.debug("delete_circuit result %s %s", result, 404)
318 1
            raise NotFound(result) from NotFound
319
320 1
        log.info("Removing %s", evc)
321 1
        with evc.lock:
322 1
            evc.remove_current_flows()
323 1
            evc.remove_failover_flows(sync=False)
324 1
            evc.deactivate()
325 1
            evc.disable()
326 1
            self.sched.remove(evc)
327 1
            evc.archive()
328 1
            evc.sync()
329 1
        log.info("EVC removed. %s", evc)
330 1
        result = {"response": f"Circuit {circuit_id} removed"}
331 1
        status = 200
332
333 1
        log.debug("delete_circuit result %s %s", result, status)
334 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
335 1
        return jsonify(result), status
336
337 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
338 1
    def get_metadata(self, circuit_id):
339
        """Get metadata from an EVC."""
340 1
        try:
341 1
            return (
342
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
343
                200,
344
            )
345
        except KeyError as error:
346
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
347
348 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
349 1
    def add_metadata(self, circuit_id):
350
        """Add metadata to an EVC."""
351 1
        try:
352 1
            metadata = request.get_json()
353 1
            content_type = request.content_type
354 1
        except BadRequest as error:
355 1
            result = "The request body is not a well-formed JSON."
356 1
            raise BadRequest(result) from error
357 1
        if content_type is None:
358 1
            result = "The request body is empty."
359 1
            raise BadRequest(result)
360 1
        if metadata is None:
361 1
            if content_type != "application/json":
362 1
                result = (
363
                    "The content type must be application/json "
364
                    f"(received {content_type})."
365
                )
366
            else:
367
                result = "Metadata is empty."
368 1
            raise UnsupportedMediaType(result)
369
370 1
        try:
371 1
            evc = self.circuits[circuit_id]
372 1
        except KeyError as error:
373 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
374
375 1
        evc.extend_metadata(metadata)
376 1
        evc.sync()
377 1
        return jsonify("Operation successful"), 201
378
379 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
380 1
    def delete_metadata(self, circuit_id, key):
381
        """Delete metadata from an EVC."""
382 1
        try:
383 1
            evc = self.circuits[circuit_id]
384 1
        except KeyError as error:
385 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
386
387 1
        evc.remove_metadata(key)
388 1
        evc.sync()
389 1
        return jsonify("Operation successful"), 200
390
391 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
392 1
    def redeploy(self, circuit_id):
393
        """Endpoint to force the redeployment of an EVC."""
394 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
395 1
        try:
396 1
            evc = self.circuits[circuit_id]
397 1
        except KeyError:
398 1
            result = f"circuit_id {circuit_id} not found"
399 1
            raise NotFound(result) from NotFound
400 1
        if evc.is_enabled():
401 1
            with evc.lock:
402 1
                evc.remove_current_flows()
403 1
                evc.deploy()
404 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
405 1
            status = 202
406
        else:
407 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
408 1
            status = 409
409
410 1
        return jsonify(result), status
411
412 1
    @rest("/v2/evc/schedule", methods=["GET"])
413 1
    def list_schedules(self):
414
        """Endpoint to return all schedules stored for all circuits.
415
416
        Return a JSON with the following template:
417
        [{"schedule_id": <schedule_id>,
418
         "circuit_id": <circuit_id>,
419
         "schedule": <schedule object>}]
420
        """
421 1
        log.debug("list_schedules /v2/evc/schedule")
422 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
423 1
        if not circuits:
424 1
            result = {}
425 1
            status = 200
426 1
            return jsonify(result), status
427
428 1
        result = []
429 1
        status = 200
430 1
        for circuit in circuits:
431 1
            circuit_scheduler = circuit.get("circuit_scheduler")
432 1
            if circuit_scheduler:
433 1
                for scheduler in circuit_scheduler:
434 1
                    value = {
435
                        "schedule_id": scheduler.get("id"),
436
                        "circuit_id": circuit.get("id"),
437
                        "schedule": scheduler,
438
                    }
439 1
                    result.append(value)
440
441 1
        log.debug("list_schedules result %s %s", result, status)
442 1
        return jsonify(result), status
443
444 1
    @rest("/v2/evc/schedule/", methods=["POST"])
445 1
    def create_schedule(self):
446
        """
447
        Create a new schedule for a given circuit.
448
449
        This service do no check if there are conflicts with another schedule.
450
        Payload example:
451
            {
452
              "circuit_id":"aa:bb:cc",
453
              "schedule": {
454
                "date": "2019-08-07T14:52:10.967Z",
455
                "interval": "string",
456
                "frequency": "1 * * * *",
457
                "action": "create"
458
              }
459
            }
460
        """
461 1
        log.debug("create_schedule /v2/evc/schedule/")
462
463 1
        json_data = self._json_from_request("create_schedule")
464 1
        try:
465 1
            circuit_id = json_data["circuit_id"]
466 1
        except TypeError:
467 1
            result = "The payload should have a dictionary."
468 1
            log.debug("create_schedule result %s %s", result, 400)
469 1
            raise BadRequest(result) from BadRequest
470 1
        except KeyError:
471 1
            result = "Missing circuit_id."
472 1
            log.debug("create_schedule result %s %s", result, 400)
473 1
            raise BadRequest(result) from BadRequest
474
475 1
        try:
476 1
            schedule_data = json_data["schedule"]
477 1
        except KeyError:
478 1
            result = "Missing schedule data."
479 1
            log.debug("create_schedule result %s %s", result, 400)
480 1
            raise BadRequest(result) from BadRequest
481
482
        # Get EVC from circuits buffer
483 1
        circuits = self._get_circuits_buffer()
484
485
        # get the circuit
486 1
        evc = circuits.get(circuit_id)
487
488
        # get the circuit
489 1
        if not evc:
490 1
            result = f"circuit_id {circuit_id} not found"
491 1
            log.debug("create_schedule result %s %s", result, 404)
492 1
            raise NotFound(result) from NotFound
493
        # Can not modify circuits deleted and archived
494 1
        if evc.archived:
495 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
496 1
            log.debug("create_schedule result %s %s", result, 403)
497 1
            raise Forbidden(result) from Forbidden
498
499
        # new schedule from dict
500 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
501
502
        # If there is no schedule, create the list
503 1
        if not evc.circuit_scheduler:
504 1
            evc.circuit_scheduler = []
505
506
        # Add the new schedule
507 1
        evc.circuit_scheduler.append(new_schedule)
508
509
        # Add schedule job
510 1
        self.sched.add_circuit_job(evc, new_schedule)
511
512
        # save circuit to mongodb
513 1
        evc.sync()
514
515 1
        result = new_schedule.as_dict()
516 1
        status = 201
517
518 1
        log.debug("create_schedule result %s %s", result, status)
519 1
        return jsonify(result), status
520
521 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
522 1
    def update_schedule(self, schedule_id):
523
        """Update a schedule.
524
525
        Change all attributes from the given schedule from a EVC circuit.
526
        The schedule ID is preserved as default.
527
        Payload example:
528
            {
529
              "date": "2019-08-07T14:52:10.967Z",
530
              "interval": "string",
531
              "frequency": "1 * * *",
532
              "action": "create"
533
            }
534
        """
535 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
536
537
        # Try to find a circuit schedule
538 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
539
540
        # Can not modify circuits deleted and archived
541 1
        if not found_schedule:
542 1
            result = f"schedule_id {schedule_id} not found"
543 1
            log.debug("update_schedule result %s %s", result, 404)
544 1
            raise NotFound(result) from NotFound
545 1
        if evc.archived:
546 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
547 1
            log.debug("update_schedule result %s %s", result, 403)
548 1
            raise Forbidden(result) from Forbidden
549
550 1
        data = self._json_from_request("update_schedule")
551
552 1
        new_schedule = CircuitSchedule.from_dict(data)
553 1
        new_schedule.id = found_schedule.id
554
        # Remove the old schedule
555 1
        evc.circuit_scheduler.remove(found_schedule)
556
        # Append the modified schedule
557 1
        evc.circuit_scheduler.append(new_schedule)
558
559
        # Cancel all schedule jobs
560 1
        self.sched.cancel_job(found_schedule.id)
561
        # Add the new circuit schedule
562 1
        self.sched.add_circuit_job(evc, new_schedule)
563
        # Save EVC to mongodb
564 1
        evc.sync()
565
566 1
        result = new_schedule.as_dict()
567 1
        status = 200
568
569 1
        log.debug("update_schedule result %s %s", result, status)
570 1
        return jsonify(result), status
571
572 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
573 1
    def delete_schedule(self, schedule_id):
574
        """Remove a circuit schedule.
575
576
        Remove the Schedule from EVC.
577
        Remove the Schedule from cron job.
578
        Save the EVC to the Storehouse.
579
        """
580 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
581 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
582
583
        # Can not modify circuits deleted and archived
584 1
        if not found_schedule:
585 1
            result = f"schedule_id {schedule_id} not found"
586 1
            log.debug("delete_schedule result %s %s", result, 404)
587 1
            raise NotFound(result)
588
589 1
        if evc.archived:
590 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
591 1
            log.debug("delete_schedule result %s %s", result, 403)
592 1
            raise Forbidden(result)
593
594
        # Remove the old schedule
595 1
        evc.circuit_scheduler.remove(found_schedule)
596
597
        # Cancel all schedule jobs
598 1
        self.sched.cancel_job(found_schedule.id)
599
        # Save EVC to mongodb
600 1
        evc.sync()
601
602 1
        result = "Schedule removed"
603 1
        status = 200
604
605 1
        log.debug("delete_schedule result %s %s", result, status)
606 1
        return jsonify(result), status
607
608 1
    def _is_duplicated_evc(self, evc):
609
        """Verify if the circuit given is duplicated with the stored evcs.
610
611
        Args:
612
            evc (EVC): circuit to be analysed.
613
614
        Returns:
615
            boolean: True if the circuit is duplicated, otherwise False.
616
617
        """
618 1
        for circuit in tuple(self.circuits.values()):
619 1
            if not circuit.archived and circuit.shares_uni(evc):
620 1
                return True
621 1
        return False
622
623 1
    @listen_to("kytos/topology.link_up")
624 1
    def on_link_up(self, event):
625
        """Change circuit when link is up or end_maintenance."""
626
        self.handle_link_up(event)
627
628 1
    def handle_link_up(self, event):
629
        """Change circuit when link is up or end_maintenance."""
630 1
        log.debug("Event handle_link_up %s", event)
631 1
        for evc in self.get_evcs_by_svc_level():
632 1
            if evc.is_enabled() and not evc.archived:
633 1
                with evc.lock:
634 1
                    evc.handle_link_up(event.content["link"])
635
636 1
    @listen_to("kytos/topology.link_down")
637 1
    def on_link_down(self, event):
638
        """Change circuit when link is down or under_mantenance."""
639
        self.handle_link_down(event)
640
641 1
    def handle_link_down(self, event):
642
        """Change circuit when link is down or under_mantenance."""
643 1
        link = event.content["link"]
644 1
        log.info("Event handle_link_down %s", link)
645 1
        switch_flows = {}
646 1
        evcs_with_failover = []
647 1
        evcs_normal = []
648 1
        check_failover = []
649 1
        for evc in self.get_evcs_by_svc_level():
650 1
            if evc.is_affected_by_link(link):
651
                # if there is no failover path, handles link down the
652
                # tradditional way
653 1
                if (
654
                    not getattr(evc, 'failover_path', None) or
655
                    evc.is_failover_path_affected_by_link(link)
656
                ):
657 1
                    evcs_normal.append(evc)
658 1
                    continue
659 1
                for dpid, flows in evc.get_failover_flows().items():
660 1
                    switch_flows.setdefault(dpid, [])
661 1
                    switch_flows[dpid].extend(flows)
662 1
                evcs_with_failover.append(evc)
663
            else:
664 1
                check_failover.append(evc)
665
666 1
        offset = 0
667 1
        while switch_flows:
668 1
            offset = (offset + settings.BATCH_SIZE) or None
669 1
            switches = list(switch_flows.keys())
670 1
            for dpid in switches:
671 1
                emit_event(
672
                    self.controller,
673
                    context="kytos.flow_manager",
674
                    name="flows.install",
675
                    dpid=dpid,
676
                    flow_dict={"flows": switch_flows[dpid][:offset]},
677
                )
678 1
                if offset is None or offset >= len(switch_flows[dpid]):
679 1
                    del switch_flows[dpid]
680 1
                    continue
681 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
682 1
            time.sleep(settings.BATCH_INTERVAL)
683
684 1
        for evc in evcs_with_failover:
685 1
            with evc.lock:
686 1
                old_path = evc.current_path
687 1
                evc.current_path = evc.failover_path
688 1
                evc.failover_path = old_path
689 1
                evc.sync()
690 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
691 1
            log.info(
692
                f"{evc} redeployed with failover due to link down {link.id}"
693
            )
694
695 1
        for evc in evcs_normal:
696 1
            emit_event(
697
                self.controller,
698
                "evc_affected_by_link_down",
699
                evc_id=evc.id,
700
                link_id=link.id,
701
            )
702
703
        # After handling the hot path, check if new failover paths are needed.
704
        # Note that EVCs affected by link down will generate a KytosEvent for
705
        # deployed|redeployed, which will trigger the failover path setup.
706
        # Thus, we just need to further check the check_failover list
707 1
        for evc in check_failover:
708 1
            if evc.is_failover_path_affected_by_link(link):
709 1
                evc.setup_failover_path()
710
711 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
712 1
    def on_evc_affected_by_link_down(self, event):
713
        """Change circuit when link is down or under_mantenance."""
714
        self.handle_evc_affected_by_link_down(event)
715
716 1
    def handle_evc_affected_by_link_down(self, event):
717
        """Change circuit when link is down or under_mantenance."""
718 1
        evc = self.circuits.get(event.content["evc_id"])
719 1
        link_id = event.content['link_id']
720 1
        if not evc:
721 1
            return
722 1
        with evc.lock:
723 1
            result = evc.handle_link_down()
724 1
        event_name = "error_redeploy_link_down"
725 1
        if result:
726 1
            log.info(f"{evc} redeployed due to link down {link_id}")
727 1
            event_name = "redeployed_link_down"
728 1
        emit_event(self.controller, event_name, evc_id=evc.id)
729
730 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
731 1
    def on_evc_deployed(self, event):
732
        """Handle EVC deployed|redeployed_link_down."""
733
        self.handle_evc_deployed(event)
734
735 1
    def handle_evc_deployed(self, event):
736
        """Setup failover path on evc deployed."""
737 1
        evc = self.circuits.get(event.content["evc_id"])
738 1
        if not evc:
739 1
            return
740 1
        with evc.lock:
741 1
            evc.setup_failover_path()
742
743 1
    @listen_to("kytos/topology.topology_loaded")
744 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
745
        """Load EVCs once the topology is available."""
746
        self.load_all_evcs()
747
748 1
    def load_all_evcs(self):
749
        """Try to load all EVCs on startup."""
750 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
751 1
        for circuit_id, circuit in circuits:
752 1
            if circuit_id not in self.circuits:
753 1
                self._load_evc(circuit)
754
755 1
    def _load_evc(self, circuit_dict):
756
        """Load one EVC from mongodb to memory."""
757 1
        try:
758 1
            evc = self._evc_from_dict(circuit_dict)
759 1
        except ValueError as exception:
760 1
            log.error(
761
                f"Could not load EVC: dict={circuit_dict} error={exception}"
762
            )
763 1
            return None
764
765 1
        if evc.archived:
766 1
            return None
767 1
        evc.deactivate()
768 1
        evc.sync()
769 1
        self.circuits.setdefault(evc.id, evc)
770 1
        self.sched.add(evc)
771 1
        return evc
772
773 1
    @listen_to("kytos/flow_manager.flow.error")
774 1
    def on_flow_mod_error(self, event):
775
        """Handle flow mod errors related to an EVC."""
776
        self.handle_flow_mod_error(event)
777
778 1
    def handle_flow_mod_error(self, event):
779
        """Handle flow mod errors related to an EVC."""
780 1
        flow = event.content["flow"]
781 1
        command = event.content.get("error_command")
782 1
        if command != "add":
783
            return
784 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
785 1
        if evc:
786 1
            evc.remove_current_flows()
787
788 1
    def _evc_dict_with_instances(self, evc_dict):
789
        """Convert some dict values to instance of EVC classes.
790
791
        This method will convert: [UNI, Link]
792
        """
793 1
        data = evc_dict.copy()  # Do not modify the original dict
794 1
        for attribute, value in data.items():
795
            # Get multiple attributes.
796
            # Ex: uni_a, uni_z
797 1
            if "uni" in attribute:
798 1
                try:
799 1
                    data[attribute] = self._uni_from_dict(value)
800 1
                except ValueError as exception:
801 1
                    result = "Error creating UNI: Invalid value"
802 1
                    raise ValueError(result) from exception
803
804 1
            if attribute == "circuit_scheduler":
805 1
                data[attribute] = []
806 1
                for schedule in value:
807 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
808
809
            # Get multiple attributes.
810
            # Ex: primary_links,
811
            #     backup_links,
812
            #     current_links_cache,
813
            #     primary_links_cache,
814
            #     backup_links_cache
815 1
            if "links" in attribute:
816 1
                data[attribute] = [
817
                    self._link_from_dict(link) for link in value
818
                ]
819
820
            # Ex: current_path,
821
            #     primary_path,
822
            #     backup_path
823 1
            if "path" in attribute and attribute != "dynamic_backup_path":
824 1
                data[attribute] = Path(
825
                    [self._link_from_dict(link) for link in value]
826
                )
827
828 1
        return data
829
830 1
    def _evc_from_dict(self, evc_dict):
831 1
        data = self._evc_dict_with_instances(evc_dict)
832 1
        return EVC(self.controller, **data)
833
834 1
    def _uni_from_dict(self, uni_dict):
835
        """Return a UNI object from python dict."""
836 1
        if uni_dict is None:
837 1
            return False
838
839 1
        interface_id = uni_dict.get("interface_id")
840 1
        interface = self.controller.get_interface_by_id(interface_id)
841 1
        if interface is None:
842 1
            result = (
843
                "Error creating UNI:"
844
                + f"Could not instantiate interface {interface_id}"
845
            )
846 1
            raise ValueError(result) from ValueError
847
848 1
        tag_dict = uni_dict.get("tag", None)
849 1
        if tag_dict:
850 1
            tag = TAG.from_dict(tag_dict)
851
        else:
852 1
            tag = None
853 1
        uni = UNI(interface, tag)
854
855 1
        return uni
856
857 1
    def _link_from_dict(self, link_dict):
858
        """Return a Link object from python dict."""
859 1
        id_a = link_dict.get("endpoint_a").get("id")
860 1
        id_b = link_dict.get("endpoint_b").get("id")
861
862 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
863 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
864 1
        if not endpoint_a:
865
            error_msg = f"Could not get interface endpoint_a id {id_a}"
866
            raise ValueError(error_msg)
867 1
        if not endpoint_b:
868
            error_msg = f"Could not get interface endpoint_b id {id_b}"
869
            raise ValueError(error_msg)
870
871 1
        link = Link(endpoint_a, endpoint_b)
872 1
        if "metadata" in link_dict:
873
            link.extend_metadata(link_dict.get("metadata"))
874
875 1
        s_vlan = link.get_metadata("s_vlan")
876 1
        if s_vlan:
877
            tag = TAG.from_dict(s_vlan)
878
            if tag is False:
879
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
880
                raise ValueError(error_msg)
881
            link.update_metadata("s_vlan", tag)
882 1
        return link
883
884 1
    def _find_evc_by_schedule_id(self, schedule_id):
885
        """
886
        Find an EVC and CircuitSchedule based on schedule_id.
887
888
        :param schedule_id: Schedule ID
889
        :return: EVC and Schedule
890
        """
891 1
        circuits = self._get_circuits_buffer()
892 1
        found_schedule = None
893 1
        evc = None
894
895
        # pylint: disable=unused-variable
896 1
        for c_id, circuit in circuits.items():
897 1
            for schedule in circuit.circuit_scheduler:
898 1
                if schedule.id == schedule_id:
899 1
                    found_schedule = schedule
900 1
                    evc = circuit
901 1
                    break
902 1
            if found_schedule:
903 1
                break
904 1
        return evc, found_schedule
905
906 1
    def _get_circuits_buffer(self):
907
        """
908
        Return the circuit buffer.
909
910
        If the buffer is empty, try to load data from mongodb.
911
        """
912 1
        if not self.circuits:
913
            # Load circuits from mongodb to buffer
914 1
            circuits = self.mongo_controller.get_circuits()['circuits']
915 1
            for c_id, circuit in circuits.items():
916 1
                evc = self._evc_from_dict(circuit)
917 1
                self.circuits[c_id] = evc
918 1
        return self.circuits
919
920 1
    @staticmethod
921 1
    def _json_from_request(caller):
922
        """Return a json from request.
923
924
        If it was not possible to get a json from the request, log, for debug,
925
        who was the caller and the error that ocurred, and raise an
926
        Exception.
927
        """
928 1
        try:
929 1
            json_data = request.get_json()
930
        except ValueError as exception:
931
            log.error(exception)
932
            log.debug(f"{caller} result {exception} 400")
933
            raise BadRequest(str(exception)) from BadRequest
934
        except BadRequest:
935
            result = "The request is not a valid JSON."
936
            log.debug(f"{caller} result {result} 400")
937
            raise BadRequest(result) from BadRequest
938 1
        if json_data is None:
939 1
            result = "Content-Type must be application/json"
940 1
            log.debug(f"{caller} result {result} 415")
941 1
            raise UnsupportedMediaType(result)
942
        return json_data
943