Passed
Pull Request — master (#224)
by
unknown
03:13
created

build.main.Main.handle_flow_mod_error()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 9
rs 10
c 0
b 0
f 0
ccs 7
cts 8
cp 0.875
crap 3.0175
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execution_rounds = -1
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self.execution_rounds < 0:
79
            self.execution_rounds += 1
80
            return
81 1
        if self._lock.locked():
82 1
            return
83 1
        log.debug("Starting consistency routine")
84 1
        with self._lock:
85 1
            self.execute_consistency()
86 1
        log.debug("Finished consistency routine")
87
88 1
    def execute_consistency(self):
89
        """Execute consistency routine."""
90 1
        self.execution_rounds += 1
91 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
92 1
        for circuit in self.get_evcs_by_svc_level():
93 1
            stored_circuits.pop(circuit.id, None)
94 1
            if (
95
                circuit.is_enabled()
96
                and not circuit.is_active()
97
                and not circuit.lock.locked()
98
                and not circuit.recent_updated()
99
            ):
100 1
                if circuit.check_traces():
101 1
                    log.info(f"{circuit} enabled but inactive - activating")
102 1
                    with circuit.lock:
103 1
                        circuit.activate()
104 1
                        circuit.sync()
105
                else:
106 1
                    if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
107 1
                        log.info(f"{circuit} enabled but inactive - redeploy")
108 1
                        with circuit.lock:
109 1
                            circuit.deploy()
110 1
        for circuit_id in stored_circuits:
111 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
112 1
            self._load_evc(stored_circuits[circuit_id])
113
114 1
    @listen_to('kytos/flow_manager.flow.removed')
115 1
    def on_flow_mod_delete(self, event):
116
        """Capture delete messages to keep track when flows got removed."""
117
        self.handle_flow_mod_delete(event)
118
119 1
    def handle_flow_mod_delete(self, event):
120
        """Keep track when the EVC got flows removed by deriving its cookie."""
121
        flow = event.content["flow"]
122
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
123
        if evc:
124
            evc.sync()
125
126 1
    def shutdown(self):
127
        """Execute when your napp is unloaded.
128
129
        If you have some cleanup procedure, insert it here.
130
        """
131
132 1
    @rest("/v2/evc/", methods=["GET"])
133 1
    def list_circuits(self):
134
        """Endpoint to return circuits stored.
135
136
        archive query arg if defined (not null) will be filtered
137
        accordingly, by default only non archived evcs will be listed
138
        """
139 1
        log.debug("list_circuits /v2/evc")
140 1
        archived = request.args.get("archived", "false").lower()
141 1
        archived_to_optional = {"null": None, "true": True, "false": False}
142 1
        archived = archived_to_optional.get(archived, False)
143 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
144 1
        circuits = circuits['circuits']
145 1
        return jsonify(circuits), 200
146
147 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
148 1
    def get_circuit(self, circuit_id):
149
        """Endpoint to return a circuit based on id."""
150 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
151 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
152 1
        if not circuit:
153 1
            result = f"circuit_id {circuit_id} not found"
154 1
            log.debug("get_circuit result %s %s", result, 404)
155 1
            raise NotFound(result)
156 1
        status = 200
157 1
        log.debug("get_circuit result %s %s", circuit, status)
158 1
        return jsonify(circuit), status
159
160 1
    @rest("/v2/evc/", methods=["POST"])
161 1
    @validate(spec)
162 1
    def create_circuit(self, data):
163
        """Try to create a new circuit.
164
165
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
166
        UNI_Z's requested C-VID are available from the interfaces' pools. This
167
        is checked when creating the UNI object.
168
169
        Then, E-Line NApp requests a primary and a backup path to the
170
        Pathfinder NApp using the attributes primary_links and backup_links
171
        submitted via REST
172
173
        # For each link composing paths in #3:
174
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
175
        #  - Using the S-VID obtained, generate abstract flow entries to be
176
        #    sent to FlowManager
177
178
        Push abstract flow entries to FlowManager and FlowManager pushes
179
        OpenFlow entries to datapaths
180
181
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
182
        creation
183
184
        Finnaly, notify user of the status of its request.
185
        """
186
        # Try to create the circuit object
187 1
        log.debug("create_circuit /v2/evc/")
188
189 1
        try:
190 1
            evc = self._evc_from_dict(data)
191 1
        except ValueError as exception:
192 1
            log.debug("create_circuit result %s %s", exception, 400)
193 1
            raise BadRequest(str(exception)) from BadRequest
194
195 1
        if evc.primary_path:
196
            try:
197
                evc.primary_path.is_valid(
198
                    evc.uni_a.interface.switch,
199
                    evc.uni_z.interface.switch,
200
                    bool(evc.circuit_scheduler),
201
                )
202
            except InvalidPath as exception:
203
                raise BadRequest(
204
                    f"primary_path is not valid: {exception}"
205
                ) from exception
206 1
        if evc.backup_path:
207
            try:
208
                evc.backup_path.is_valid(
209
                    evc.uni_a.interface.switch,
210
                    evc.uni_z.interface.switch,
211
                    bool(evc.circuit_scheduler),
212
                )
213
            except InvalidPath as exception:
214
                raise BadRequest(
215
                    f"backup_path is not valid: {exception}"
216
                ) from exception
217
218
        # verify duplicated evc
219 1
        if self._is_duplicated_evc(evc):
220 1
            result = "The EVC already exists."
221 1
            log.debug("create_circuit result %s %s", result, 409)
222 1
            raise Conflict(result)
223
224 1
        if (
225
            not evc.primary_path
226
            and evc.dynamic_backup_path is False
227
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
228
        ):
229 1
            result = "The EVC must have a primary path or allow dynamic paths."
230 1
            log.debug("create_circuit result %s %s", result, 400)
231 1
            raise BadRequest(result)
232
233
        # store circuit in dictionary
234 1
        self.circuits[evc.id] = evc
235
236
        # save circuit
237 1
        evc.sync()
238
239
        # Schedule the circuit deploy
240 1
        self.sched.add(evc)
241
242
        # Circuit has no schedule, deploy now
243 1
        if not evc.circuit_scheduler:
244 1
            with evc.lock:
245 1
                evc.deploy()
246
247
        # Notify users
248 1
        event = KytosEvent(
249
            name="kytos.mef_eline.created", content=evc.as_dict()
250
        )
251 1
        self.controller.buffers.app.put(event)
252
253 1
        result = {"circuit_id": evc.id}
254 1
        status = 201
255 1
        log.debug("create_circuit result %s %s", result, status)
256 1
        emit_event(self.controller, "created", evc_id=evc.id)
257 1
        return jsonify(result), status
258
259 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
260 1
    def update(self, circuit_id):
261
        """Update a circuit based on payload.
262
263
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
264
        """
265 1
        log.debug("update /v2/evc/%s", circuit_id)
266 1
        try:
267 1
            evc = self.circuits[circuit_id]
268 1
        except KeyError:
269 1
            result = f"circuit_id {circuit_id} not found"
270 1
            log.debug("update result %s %s", result, 404)
271 1
            raise NotFound(result) from NotFound
272
273 1
        if evc.archived:
274 1
            result = "Can't update archived EVC"
275 1
            log.debug("update result %s %s", result, 405)
276 1
            raise MethodNotAllowed(["GET"], result)
277
278 1
        try:
279 1
            data = request.get_json()
280 1
        except BadRequest:
281 1
            result = "The request body is not a well-formed JSON."
282 1
            log.debug("update result %s %s", result, 400)
283 1
            raise BadRequest(result) from BadRequest
284 1
        if data is None:
285 1
            result = "The request body mimetype is not application/json."
286 1
            log.debug("update result %s %s", result, 415)
287 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
288
289 1
        try:
290 1
            enable, redeploy = evc.update(
291
                **self._evc_dict_with_instances(data)
292
            )
293 1
        except ValueError as exception:
294 1
            log.error(exception)
295 1
            log.debug("update result %s %s", exception, 400)
296 1
            raise BadRequest(str(exception)) from BadRequest
297
298 1
        if evc.is_active():
299
            if enable is False:  # disable if active
300
                with evc.lock:
301
                    evc.remove()
302
            elif redeploy is not None:  # redeploy if active
303
                with evc.lock:
304
                    evc.remove()
305
                    evc.deploy()
306
        else:
307 1
            if enable is True:  # enable if inactive
308 1
                with evc.lock:
309 1
                    evc.deploy()
310 1
        result = {evc.id: evc.as_dict()}
311 1
        status = 200
312
313 1
        log.debug("update result %s %s", result, status)
314 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
315 1
        return jsonify(result), status
316
317 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
318 1
    def delete_circuit(self, circuit_id):
319
        """Remove a circuit.
320
321
        First, the flows are removed from the switches, and then the EVC is
322
        disabled.
323
        """
324 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
325 1
        try:
326 1
            evc = self.circuits[circuit_id]
327 1
        except KeyError:
328 1
            result = f"circuit_id {circuit_id} not found"
329 1
            log.debug("delete_circuit result %s %s", result, 404)
330 1
            raise NotFound(result) from NotFound
331
332 1
        if evc.archived:
333 1
            result = f"Circuit {circuit_id} already removed"
334 1
            log.debug("delete_circuit result %s %s", result, 404)
335 1
            raise NotFound(result) from NotFound
336
337 1
        log.info("Removing %s", evc)
338 1
        with evc.lock:
339 1
            evc.remove_current_flows()
340 1
            evc.remove_failover_flows(sync=False)
341 1
            evc.deactivate()
342 1
            evc.disable()
343 1
            self.sched.remove(evc)
344 1
            evc.archive()
345 1
            evc.sync()
346 1
        log.info("EVC removed. %s", evc)
347 1
        result = {"response": f"Circuit {circuit_id} removed"}
348 1
        status = 200
349
350 1
        log.debug("delete_circuit result %s %s", result, status)
351 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
352 1
        return jsonify(result), status
353
354 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
355 1
    def get_metadata(self, circuit_id):
356
        """Get metadata from an EVC."""
357 1
        try:
358 1
            return (
359
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
360
                200,
361
            )
362
        except KeyError as error:
363
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
364
365 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
366 1
    def add_metadata(self, circuit_id):
367
        """Add metadata to an EVC."""
368 1
        try:
369 1
            metadata = request.get_json()
370 1
            content_type = request.content_type
371 1
        except BadRequest as error:
372 1
            result = "The request body is not a well-formed JSON."
373 1
            raise BadRequest(result) from error
374 1
        if content_type is None:
375 1
            result = "The request body is empty."
376 1
            raise BadRequest(result)
377 1
        if metadata is None:
378 1
            if content_type != "application/json":
379 1
                result = (
380
                    "The content type must be application/json "
381
                    f"(received {content_type})."
382
                )
383
            else:
384
                result = "Metadata is empty."
385 1
            raise UnsupportedMediaType(result)
386
387 1
        try:
388 1
            evc = self.circuits[circuit_id]
389 1
        except KeyError as error:
390 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
391
392 1
        evc.extend_metadata(metadata)
393 1
        evc.sync()
394 1
        return jsonify("Operation successful"), 201
395
396 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
397 1
    def delete_metadata(self, circuit_id, key):
398
        """Delete metadata from an EVC."""
399 1
        try:
400 1
            evc = self.circuits[circuit_id]
401 1
        except KeyError as error:
402 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
403
404 1
        evc.remove_metadata(key)
405 1
        evc.sync()
406 1
        return jsonify("Operation successful"), 200
407
408 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
409 1
    def redeploy(self, circuit_id):
410
        """Endpoint to force the redeployment of an EVC."""
411 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
412 1
        try:
413 1
            evc = self.circuits[circuit_id]
414 1
        except KeyError:
415 1
            result = f"circuit_id {circuit_id} not found"
416 1
            raise NotFound(result) from NotFound
417 1
        if evc.is_enabled():
418 1
            with evc.lock:
419 1
                evc.remove_current_flows()
420 1
                evc.deploy()
421 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
422 1
            status = 202
423
        else:
424 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
425 1
            status = 409
426
427 1
        return jsonify(result), status
428
429 1
    @rest("/v2/evc/schedule", methods=["GET"])
430 1
    def list_schedules(self):
431
        """Endpoint to return all schedules stored for all circuits.
432
433
        Return a JSON with the following template:
434
        [{"schedule_id": <schedule_id>,
435
         "circuit_id": <circuit_id>,
436
         "schedule": <schedule object>}]
437
        """
438 1
        log.debug("list_schedules /v2/evc/schedule")
439 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
440 1
        if not circuits:
441 1
            result = {}
442 1
            status = 200
443 1
            return jsonify(result), status
444
445 1
        result = []
446 1
        status = 200
447 1
        for circuit in circuits:
448 1
            circuit_scheduler = circuit.get("circuit_scheduler")
449 1
            if circuit_scheduler:
450 1
                for scheduler in circuit_scheduler:
451 1
                    value = {
452
                        "schedule_id": scheduler.get("id"),
453
                        "circuit_id": circuit.get("id"),
454
                        "schedule": scheduler,
455
                    }
456 1
                    result.append(value)
457
458 1
        log.debug("list_schedules result %s %s", result, status)
459 1
        return jsonify(result), status
460
461 1
    @rest("/v2/evc/schedule/", methods=["POST"])
462 1
    def create_schedule(self):
463
        """
464
        Create a new schedule for a given circuit.
465
466
        This service do no check if there are conflicts with another schedule.
467
        Payload example:
468
            {
469
              "circuit_id":"aa:bb:cc",
470
              "schedule": {
471
                "date": "2019-08-07T14:52:10.967Z",
472
                "interval": "string",
473
                "frequency": "1 * * * *",
474
                "action": "create"
475
              }
476
            }
477
        """
478 1
        log.debug("create_schedule /v2/evc/schedule/")
479
480 1
        json_data = self._json_from_request("create_schedule")
481 1
        try:
482 1
            circuit_id = json_data["circuit_id"]
483 1
        except TypeError:
484 1
            result = "The payload should have a dictionary."
485 1
            log.debug("create_schedule result %s %s", result, 400)
486 1
            raise BadRequest(result) from BadRequest
487 1
        except KeyError:
488 1
            result = "Missing circuit_id."
489 1
            log.debug("create_schedule result %s %s", result, 400)
490 1
            raise BadRequest(result) from BadRequest
491
492 1
        try:
493 1
            schedule_data = json_data["schedule"]
494 1
        except KeyError:
495 1
            result = "Missing schedule data."
496 1
            log.debug("create_schedule result %s %s", result, 400)
497 1
            raise BadRequest(result) from BadRequest
498
499
        # Get EVC from circuits buffer
500 1
        circuits = self._get_circuits_buffer()
501
502
        # get the circuit
503 1
        evc = circuits.get(circuit_id)
504
505
        # get the circuit
506 1
        if not evc:
507 1
            result = f"circuit_id {circuit_id} not found"
508 1
            log.debug("create_schedule result %s %s", result, 404)
509 1
            raise NotFound(result) from NotFound
510
        # Can not modify circuits deleted and archived
511 1
        if evc.archived:
512 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
513 1
            log.debug("create_schedule result %s %s", result, 403)
514 1
            raise Forbidden(result) from Forbidden
515
516
        # new schedule from dict
517 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
518
519
        # If there is no schedule, create the list
520 1
        if not evc.circuit_scheduler:
521 1
            evc.circuit_scheduler = []
522
523
        # Add the new schedule
524 1
        evc.circuit_scheduler.append(new_schedule)
525
526
        # Add schedule job
527 1
        self.sched.add_circuit_job(evc, new_schedule)
528
529
        # save circuit to mongodb
530 1
        evc.sync()
531
532 1
        result = new_schedule.as_dict()
533 1
        status = 201
534
535 1
        log.debug("create_schedule result %s %s", result, status)
536 1
        return jsonify(result), status
537
538 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
539 1
    def update_schedule(self, schedule_id):
540
        """Update a schedule.
541
542
        Change all attributes from the given schedule from a EVC circuit.
543
        The schedule ID is preserved as default.
544
        Payload example:
545
            {
546
              "date": "2019-08-07T14:52:10.967Z",
547
              "interval": "string",
548
              "frequency": "1 * * *",
549
              "action": "create"
550
            }
551
        """
552 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
553
554
        # Try to find a circuit schedule
555 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
556
557
        # Can not modify circuits deleted and archived
558 1
        if not found_schedule:
559 1
            result = f"schedule_id {schedule_id} not found"
560 1
            log.debug("update_schedule result %s %s", result, 404)
561 1
            raise NotFound(result) from NotFound
562 1
        if evc.archived:
563 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
564 1
            log.debug("update_schedule result %s %s", result, 403)
565 1
            raise Forbidden(result) from Forbidden
566
567 1
        data = self._json_from_request("update_schedule")
568
569 1
        new_schedule = CircuitSchedule.from_dict(data)
570 1
        new_schedule.id = found_schedule.id
571
        # Remove the old schedule
572 1
        evc.circuit_scheduler.remove(found_schedule)
573
        # Append the modified schedule
574 1
        evc.circuit_scheduler.append(new_schedule)
575
576
        # Cancel all schedule jobs
577 1
        self.sched.cancel_job(found_schedule.id)
578
        # Add the new circuit schedule
579 1
        self.sched.add_circuit_job(evc, new_schedule)
580
        # Save EVC to mongodb
581 1
        evc.sync()
582
583 1
        result = new_schedule.as_dict()
584 1
        status = 200
585
586 1
        log.debug("update_schedule result %s %s", result, status)
587 1
        return jsonify(result), status
588
589 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
590 1
    def delete_schedule(self, schedule_id):
591
        """Remove a circuit schedule.
592
593
        Remove the Schedule from EVC.
594
        Remove the Schedule from cron job.
595
        Save the EVC to the Storehouse.
596
        """
597 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
598 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
599
600
        # Can not modify circuits deleted and archived
601 1
        if not found_schedule:
602 1
            result = f"schedule_id {schedule_id} not found"
603 1
            log.debug("delete_schedule result %s %s", result, 404)
604 1
            raise NotFound(result)
605
606 1
        if evc.archived:
607 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
608 1
            log.debug("delete_schedule result %s %s", result, 403)
609 1
            raise Forbidden(result)
610
611
        # Remove the old schedule
612 1
        evc.circuit_scheduler.remove(found_schedule)
613
614
        # Cancel all schedule jobs
615 1
        self.sched.cancel_job(found_schedule.id)
616
        # Save EVC to mongodb
617 1
        evc.sync()
618
619 1
        result = "Schedule removed"
620 1
        status = 200
621
622 1
        log.debug("delete_schedule result %s %s", result, status)
623 1
        return jsonify(result), status
624
625 1
    def _is_duplicated_evc(self, evc):
626
        """Verify if the circuit given is duplicated with the stored evcs.
627
628
        Args:
629
            evc (EVC): circuit to be analysed.
630
631
        Returns:
632
            boolean: True if the circuit is duplicated, otherwise False.
633
634
        """
635 1
        for circuit in tuple(self.circuits.values()):
636 1
            if not circuit.archived and circuit.shares_uni(evc):
637 1
                return True
638 1
        return False
639
640 1
    @listen_to("kytos/topology.link_up")
641 1
    def on_link_up(self, event):
642
        """Change circuit when link is up or end_maintenance."""
643
        self.handle_link_up(event)
644
645 1
    def handle_link_up(self, event):
646
        """Change circuit when link is up or end_maintenance."""
647 1
        log.debug("Event handle_link_up %s", event)
648 1
        for evc in self.get_evcs_by_svc_level():
649 1
            if evc.is_enabled() and not evc.archived:
650 1
                with evc.lock:
651 1
                    evc.handle_link_up(event.content["link"])
652
653 1
    @listen_to("kytos/topology.link_down")
654 1
    def on_link_down(self, event):
655
        """Change circuit when link is down or under_mantenance."""
656
        self.handle_link_down(event)
657
658 1
    def handle_link_down(self, event):
659
        """Change circuit when link is down or under_mantenance."""
660 1
        link = event.content["link"]
661 1
        log.info("Event handle_link_down %s", link)
662 1
        switch_flows = {}
663 1
        evcs_with_failover = []
664 1
        evcs_normal = []
665 1
        check_failover = []
666 1
        for evc in self.get_evcs_by_svc_level():
667 1
            if evc.is_affected_by_link(link):
668
                # if there is no failover path, handles link down the
669
                # tradditional way
670 1
                if (
671
                    not getattr(evc, 'failover_path', None) or
672
                    evc.is_failover_path_affected_by_link(link)
673
                ):
674 1
                    evcs_normal.append(evc)
675 1
                    continue
676 1
                for dpid, flows in evc.get_failover_flows().items():
677 1
                    switch_flows.setdefault(dpid, [])
678 1
                    switch_flows[dpid].extend(flows)
679 1
                evcs_with_failover.append(evc)
680
            else:
681 1
                check_failover.append(evc)
682
683 1
        offset = 0
684 1
        while switch_flows:
685 1
            offset = (offset + settings.BATCH_SIZE) or None
686 1
            switches = list(switch_flows.keys())
687 1
            for dpid in switches:
688 1
                emit_event(
689
                    self.controller,
690
                    context="kytos.flow_manager",
691
                    name="flows.install",
692
                    dpid=dpid,
693
                    flow_dict={"flows": switch_flows[dpid][:offset]},
694
                )
695 1
                if offset is None or offset >= len(switch_flows[dpid]):
696 1
                    del switch_flows[dpid]
697 1
                    continue
698 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
699 1
            time.sleep(settings.BATCH_INTERVAL)
700
701 1
        for evc in evcs_with_failover:
702 1
            with evc.lock:
703 1
                old_path = evc.current_path
704 1
                evc.current_path = evc.failover_path
705 1
                evc.failover_path = old_path
706 1
                evc.sync()
707 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
708 1
            log.info(
709
                f"{evc} redeployed with failover due to link down {link.id}"
710
            )
711
712 1
        for evc in evcs_normal:
713 1
            emit_event(
714
                self.controller,
715
                "evc_affected_by_link_down",
716
                evc_id=evc.id,
717
                link_id=link.id,
718
            )
719
720
        # After handling the hot path, check if new failover paths are needed.
721
        # Note that EVCs affected by link down will generate a KytosEvent for
722
        # deployed|redeployed, which will trigger the failover path setup.
723
        # Thus, we just need to further check the check_failover list
724 1
        for evc in check_failover:
725 1
            if evc.is_failover_path_affected_by_link(link):
726 1
                evc.setup_failover_path()
727
728 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
729 1
    def on_evc_affected_by_link_down(self, event):
730
        """Change circuit when link is down or under_mantenance."""
731
        self.handle_evc_affected_by_link_down(event)
732
733 1
    def handle_evc_affected_by_link_down(self, event):
734
        """Change circuit when link is down or under_mantenance."""
735 1
        evc = self.circuits.get(event.content["evc_id"])
736 1
        link_id = event.content['link_id']
737 1
        if not evc:
738 1
            return
739 1
        with evc.lock:
740 1
            result = evc.handle_link_down()
741 1
        event_name = "error_redeploy_link_down"
742 1
        if result:
743 1
            log.info(f"{evc} redeployed due to link down {link_id}")
744 1
            event_name = "redeployed_link_down"
745 1
        emit_event(self.controller, event_name, evc_id=evc.id)
746
747 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
748 1
    def on_evc_deployed(self, event):
749
        """Handle EVC deployed|redeployed_link_down."""
750
        self.handle_evc_deployed(event)
751
752 1
    def handle_evc_deployed(self, event):
753
        """Setup failover path on evc deployed."""
754 1
        evc = self.circuits.get(event.content["evc_id"])
755 1
        if not evc:
756 1
            return
757 1
        with evc.lock:
758 1
            evc.setup_failover_path()
759
760 1
    @listen_to("kytos/topology.topology_loaded")
761 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
762
        """Load EVCs once the topology is available."""
763
        self.load_all_evcs()
764
765 1
    def load_all_evcs(self):
766
        """Try to load all EVCs on startup."""
767 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
768 1
        for circuit_id, circuit in circuits:
769 1
            if circuit_id not in self.circuits:
770 1
                self._load_evc(circuit)
771
772 1
    def _load_evc(self, circuit_dict):
773
        """Load one EVC from mongodb to memory."""
774 1
        try:
775 1
            evc = self._evc_from_dict(circuit_dict)
776 1
        except ValueError as exception:
777 1
            log.error(
778
                f"Could not load EVC: dict={circuit_dict} error={exception}"
779
            )
780 1
            return None
781
782 1
        if evc.archived:
783 1
            return None
784 1
        evc.deactivate()
785 1
        evc.sync()
786 1
        self.circuits.setdefault(evc.id, evc)
787 1
        self.sched.add(evc)
788 1
        return evc
789
790 1
    @listen_to("kytos/flow_manager.flow.error")
791 1
    def on_flow_mod_error(self, event):
792
        """Handle flow mod errors related to an EVC."""
793
        self.handle_flow_mod_error(event)
794
795 1
    def handle_flow_mod_error(self, event):
796
        """Handle flow mod errors related to an EVC."""
797 1
        flow = event.content["flow"]
798 1
        command = event.content.get("error_command")
799 1
        if command != "add":
800
            return
801 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
802 1
        if evc:
803 1
            evc.remove_current_flows()
804
805 1
    def _evc_dict_with_instances(self, evc_dict):
806
        """Convert some dict values to instance of EVC classes.
807
808
        This method will convert: [UNI, Link]
809
        """
810 1
        data = evc_dict.copy()  # Do not modify the original dict
811 1
        for attribute, value in data.items():
812
            # Get multiple attributes.
813
            # Ex: uni_a, uni_z
814 1
            if "uni" in attribute:
815 1
                try:
816 1
                    data[attribute] = self._uni_from_dict(value)
817 1
                except ValueError as exception:
818 1
                    result = "Error creating UNI: Invalid value"
819 1
                    raise ValueError(result) from exception
820
821 1
            if attribute == "circuit_scheduler":
822 1
                data[attribute] = []
823 1
                for schedule in value:
824 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
825
826
            # Get multiple attributes.
827
            # Ex: primary_links,
828
            #     backup_links,
829
            #     current_links_cache,
830
            #     primary_links_cache,
831
            #     backup_links_cache
832 1
            if "links" in attribute:
833 1
                data[attribute] = [
834
                    self._link_from_dict(link) for link in value
835
                ]
836
837
            # Ex: current_path,
838
            #     primary_path,
839
            #     backup_path
840 1
            if "path" in attribute and attribute != "dynamic_backup_path":
841 1
                data[attribute] = Path(
842
                    [self._link_from_dict(link) for link in value]
843
                )
844
845 1
        return data
846
847 1
    def _evc_from_dict(self, evc_dict):
848 1
        data = self._evc_dict_with_instances(evc_dict)
849 1
        return EVC(self.controller, **data)
850
851 1
    def _uni_from_dict(self, uni_dict):
852
        """Return a UNI object from python dict."""
853 1
        if uni_dict is None:
854 1
            return False
855
856 1
        interface_id = uni_dict.get("interface_id")
857 1
        interface = self.controller.get_interface_by_id(interface_id)
858 1
        if interface is None:
859 1
            result = (
860
                "Error creating UNI:"
861
                + f"Could not instantiate interface {interface_id}"
862
            )
863 1
            raise ValueError(result) from ValueError
864
865 1
        tag_dict = uni_dict.get("tag", None)
866 1
        if tag_dict:
867 1
            tag = TAG.from_dict(tag_dict)
868
        else:
869 1
            tag = None
870 1
        uni = UNI(interface, tag)
871
872 1
        return uni
873
874 1
    def _link_from_dict(self, link_dict):
875
        """Return a Link object from python dict."""
876 1
        id_a = link_dict.get("endpoint_a").get("id")
877 1
        id_b = link_dict.get("endpoint_b").get("id")
878
879 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
880 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
881
882 1
        link = Link(endpoint_a, endpoint_b)
883 1
        if "metadata" in link_dict:
884
            link.extend_metadata(link_dict.get("metadata"))
885
886 1
        s_vlan = link.get_metadata("s_vlan")
887 1
        if s_vlan:
888
            tag = TAG.from_dict(s_vlan)
889
            if tag is False:
890
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
891
                raise ValueError(error_msg)
892
            link.update_metadata("s_vlan", tag)
893 1
        return link
894
895 1
    def _find_evc_by_schedule_id(self, schedule_id):
896
        """
897
        Find an EVC and CircuitSchedule based on schedule_id.
898
899
        :param schedule_id: Schedule ID
900
        :return: EVC and Schedule
901
        """
902 1
        circuits = self._get_circuits_buffer()
903 1
        found_schedule = None
904 1
        evc = None
905
906
        # pylint: disable=unused-variable
907 1
        for c_id, circuit in circuits.items():
908 1
            for schedule in circuit.circuit_scheduler:
909 1
                if schedule.id == schedule_id:
910 1
                    found_schedule = schedule
911 1
                    evc = circuit
912 1
                    break
913 1
            if found_schedule:
914 1
                break
915 1
        return evc, found_schedule
916
917 1
    def _get_circuits_buffer(self):
918
        """
919
        Return the circuit buffer.
920
921
        If the buffer is empty, try to load data from mongodb.
922
        """
923 1
        if not self.circuits:
924
            # Load circuits from mongodb to buffer
925 1
            circuits = self.mongo_controller.get_circuits()['circuits']
926 1
            for c_id, circuit in circuits.items():
927 1
                evc = self._evc_from_dict(circuit)
928 1
                self.circuits[c_id] = evc
929 1
        return self.circuits
930
931 1
    @staticmethod
932 1
    def _json_from_request(caller):
933
        """Return a json from request.
934
935
        If it was not possible to get a json from the request, log, for debug,
936
        who was the caller and the error that ocurred, and raise an
937
        Exception.
938
        """
939 1
        try:
940 1
            json_data = request.get_json()
941
        except ValueError as exception:
942
            log.error(exception)
943
            log.debug(f"{caller} result {exception} 400")
944
            raise BadRequest(str(exception)) from BadRequest
945
        except BadRequest:
946
            result = "The request is not a valid JSON."
947
            log.debug(f"{caller} result {result} 400")
948
            raise BadRequest(result) from BadRequest
949 1
        if json_data is None:
950 1
            result = "Content-Type must be application/json"
951 1
            log.debug(f"{caller} result {result} 415")
952 1
            raise UnsupportedMediaType(result)
953
        return json_data
954