Passed
Pull Request — master (#226)
by
unknown
11:00 queued 03:52
created

build.main.Main.on_flow_delete()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 2
cts 3
cp 0.6667
cc 1
nop 2
crap 1.037
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
25
26
27
# pylint: disable=too-many-public-methods
28 1
class Main(KytosNApp):
29
    """Main class of amlight/mef_eline NApp.
30
31
    This class is the entry point for this napp.
32
    """
33
34 1
    spec = load_spec()
35
36 1
    def setup(self):
37
        """Replace the '__init__' method for the KytosNApp subclass.
38
39
        The setup method is automatically called by the controller when your
40
        application is loaded.
41
42
        So, if you have any setup routine, insert it here.
43
        """
44
        # object used to scheduler circuit events
45 1
        self.sched = Scheduler()
46
47
        # object to save and load circuits
48 1
        self.mongo_controller = self.get_eline_controller()
49 1
        self.mongo_controller.bootstrap_indexes()
50
51
        # set the controller that will manager the dynamic paths
52 1
        DynamicPathManager.set_controller(self.controller)
53
54
        # dictionary of EVCs created. It acts as a circuit buffer.
55
        # Every create/update/delete must be synced to mongodb.
56 1
        self.circuits = {}
57
58 1
        self._lock = Lock()
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self._lock.locked():
79 1
            return
80 1
        log.debug("Starting consistency routine")
81 1
        with self._lock:
82 1
            self.execute_consistency()
83 1
        log.debug("Finished consistency routine")
84
85 1
    def execute_consistency(self):
86
        """Execute consistency routine."""
87 1
        circuits_to_check = {}
88 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
89 1
        for circuit in self.get_evcs_by_svc_level():
90 1
            stored_circuits.pop(circuit.id, None)
91 1
            if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.flow_removed_recent()
96
                and not circuit.updated_recent()
97
            ):
98 1
                circuits_to_check[circuit.id] = circuit
99 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
100 1
        for circuit_id, is_checked in circuits_checked.items():
101 1
            circuit = circuits_to_check[circuit_id]
102 1
            if is_checked:
103 1
                circuit.execution_rounds = 0
104 1
                log.info(f"{circuit} enabled but inactive - activating")
105 1
                with circuit.lock:
106 1
                    circuit.activate()
107 1
                    circuit.sync()
108
            else:
109 1
                circuit.execution_rounds += 1
110 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
111 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
112 1
                    with circuit.lock:
113 1
                        circuit.deploy()
114 1
        for circuit_id in stored_circuits:
115 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
116 1
            self._load_evc(stored_circuits[circuit_id])
117
118 1
    def shutdown(self):
119
        """Execute when your napp is unloaded.
120
121
        If you have some cleanup procedure, insert it here.
122
        """
123
124 1
    @rest("/v2/evc/", methods=["GET"])
125 1
    def list_circuits(self):
126
        """Endpoint to return circuits stored.
127
128
        archive query arg if defined (not null) will be filtered
129
        accordingly, by default only non archived evcs will be listed
130
        """
131 1
        log.debug("list_circuits /v2/evc")
132 1
        archived = request.args.get("archived", "false").lower()
133 1
        archived_to_optional = {"null": None, "true": True, "false": False}
134 1
        archived = archived_to_optional.get(archived, False)
135 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
136 1
        circuits = circuits['circuits']
137 1
        return jsonify(circuits), 200
138
139 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
140 1
    def get_circuit(self, circuit_id):
141
        """Endpoint to return a circuit based on id."""
142 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
143 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
144 1
        if not circuit:
145 1
            result = f"circuit_id {circuit_id} not found"
146 1
            log.debug("get_circuit result %s %s", result, 404)
147 1
            raise NotFound(result)
148 1
        status = 200
149 1
        log.debug("get_circuit result %s %s", circuit, status)
150 1
        return jsonify(circuit), status
151
152 1
    @rest("/v2/evc/", methods=["POST"])
153 1
    @validate(spec)
154 1
    def create_circuit(self, data):
155
        """Try to create a new circuit.
156
157
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
158
        UNI_Z's requested C-VID are available from the interfaces' pools. This
159
        is checked when creating the UNI object.
160
161
        Then, E-Line NApp requests a primary and a backup path to the
162
        Pathfinder NApp using the attributes primary_links and backup_links
163
        submitted via REST
164
165
        # For each link composing paths in #3:
166
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
167
        #  - Using the S-VID obtained, generate abstract flow entries to be
168
        #    sent to FlowManager
169
170
        Push abstract flow entries to FlowManager and FlowManager pushes
171
        OpenFlow entries to datapaths
172
173
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
174
        creation
175
176
        Finnaly, notify user of the status of its request.
177
        """
178
        # Try to create the circuit object
179 1
        log.debug("create_circuit /v2/evc/")
180
181 1
        try:
182 1
            evc = self._evc_from_dict(data)
183 1
        except ValueError as exception:
184 1
            log.debug("create_circuit result %s %s", exception, 400)
185 1
            raise BadRequest(str(exception)) from BadRequest
186
187 1
        if evc.primary_path:
188
            try:
189
                evc.primary_path.is_valid(
190
                    evc.uni_a.interface.switch,
191
                    evc.uni_z.interface.switch,
192
                    bool(evc.circuit_scheduler),
193
                )
194
            except InvalidPath as exception:
195
                raise BadRequest(
196
                    f"primary_path is not valid: {exception}"
197
                ) from exception
198 1
        if evc.backup_path:
199
            try:
200
                evc.backup_path.is_valid(
201
                    evc.uni_a.interface.switch,
202
                    evc.uni_z.interface.switch,
203
                    bool(evc.circuit_scheduler),
204
                )
205
            except InvalidPath as exception:
206
                raise BadRequest(
207
                    f"backup_path is not valid: {exception}"
208
                ) from exception
209
210
        # verify duplicated evc
211 1
        if self._is_duplicated_evc(evc):
212 1
            result = "The EVC already exists."
213 1
            log.debug("create_circuit result %s %s", result, 409)
214 1
            raise Conflict(result)
215
216 1
        if (
217
            not evc.primary_path
218
            and evc.dynamic_backup_path is False
219
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
220
        ):
221 1
            result = "The EVC must have a primary path or allow dynamic paths."
222 1
            log.debug("create_circuit result %s %s", result, 400)
223 1
            raise BadRequest(result)
224
225
        # store circuit in dictionary
226 1
        self.circuits[evc.id] = evc
227
228
        # save circuit
229 1
        evc.sync()
230
231
        # Schedule the circuit deploy
232 1
        self.sched.add(evc)
233
234
        # Circuit has no schedule, deploy now
235 1
        if not evc.circuit_scheduler:
236 1
            with evc.lock:
237 1
                evc.deploy()
238
239
        # Notify users
240 1
        event = KytosEvent(
241
            name="kytos.mef_eline.created", content=evc.as_dict()
242
        )
243 1
        self.controller.buffers.app.put(event)
244
245 1
        result = {"circuit_id": evc.id}
246 1
        status = 201
247 1
        log.debug("create_circuit result %s %s", result, status)
248 1
        emit_event(self.controller, "created", evc_id=evc.id)
249 1
        return jsonify(result), status
250
251 1
    @listen_to('kytos/flow_manager.flow.removed')
252 1
    def on_flow_delete(self, event):
253
        """Capture delete messages to keep track when flows got removed."""
254
        self.handle_flow_delete(event)
255
256 1
    def handle_flow_delete(self, event):
257
        """Keep track when the EVC got flows removed by deriving its cookie."""
258
        flow = event.content["flow"]
259
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
260
        if evc:
261
            log.debug("Flow removed in EVC %s", evc.id)
262
            evc.flow_removed()
263
264 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
265 1
    def update(self, circuit_id):
266
        """Update a circuit based on payload.
267
268
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
269
        """
270 1
        log.debug("update /v2/evc/%s", circuit_id)
271 1
        try:
272 1
            evc = self.circuits[circuit_id]
273 1
        except KeyError:
274 1
            result = f"circuit_id {circuit_id} not found"
275 1
            log.debug("update result %s %s", result, 404)
276 1
            raise NotFound(result) from NotFound
277
278 1
        if evc.archived:
279 1
            result = "Can't update archived EVC"
280 1
            log.debug("update result %s %s", result, 405)
281 1
            raise MethodNotAllowed(["GET"], result)
282
283 1
        try:
284 1
            data = request.get_json()
285 1
        except BadRequest:
286 1
            result = "The request body is not a well-formed JSON."
287 1
            log.debug("update result %s %s", result, 400)
288 1
            raise BadRequest(result) from BadRequest
289 1
        if data is None:
290 1
            result = "The request body mimetype is not application/json."
291 1
            log.debug("update result %s %s", result, 415)
292 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
293
294 1
        try:
295 1
            enable, redeploy = evc.update(
296
                **self._evc_dict_with_instances(data)
297
            )
298 1
        except ValueError as exception:
299 1
            log.error(exception)
300 1
            log.debug("update result %s %s", exception, 400)
301 1
            raise BadRequest(str(exception)) from BadRequest
302
303 1
        if evc.is_active():
304
            if enable is False:  # disable if active
305
                with evc.lock:
306
                    evc.remove()
307
            elif redeploy is not None:  # redeploy if active
308
                with evc.lock:
309
                    evc.remove()
310
                    evc.deploy()
311
        else:
312 1
            if enable is True:  # enable if inactive
313 1
                with evc.lock:
314 1
                    evc.deploy()
315 1
        result = {evc.id: evc.as_dict()}
316 1
        status = 200
317
318 1
        log.debug("update result %s %s", result, status)
319 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
320 1
        return jsonify(result), status
321
322 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
323 1
    def delete_circuit(self, circuit_id):
324
        """Remove a circuit.
325
326
        First, the flows are removed from the switches, and then the EVC is
327
        disabled.
328
        """
329 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
330 1
        try:
331 1
            evc = self.circuits[circuit_id]
332 1
        except KeyError:
333 1
            result = f"circuit_id {circuit_id} not found"
334 1
            log.debug("delete_circuit result %s %s", result, 404)
335 1
            raise NotFound(result) from NotFound
336
337 1
        if evc.archived:
338 1
            result = f"Circuit {circuit_id} already removed"
339 1
            log.debug("delete_circuit result %s %s", result, 404)
340 1
            raise NotFound(result) from NotFound
341
342 1
        log.info("Removing %s", evc)
343 1
        with evc.lock:
344 1
            evc.remove_current_flows()
345 1
            evc.remove_failover_flows(sync=False)
346 1
            evc.deactivate()
347 1
            evc.disable()
348 1
            self.sched.remove(evc)
349 1
            evc.archive()
350 1
            evc.sync()
351 1
        log.info("EVC removed. %s", evc)
352 1
        result = {"response": f"Circuit {circuit_id} removed"}
353 1
        status = 200
354
355 1
        log.debug("delete_circuit result %s %s", result, status)
356 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
357 1
        return jsonify(result), status
358
359 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
360 1
    def get_metadata(self, circuit_id):
361
        """Get metadata from an EVC."""
362 1
        try:
363 1
            return (
364
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
365
                200,
366
            )
367
        except KeyError as error:
368
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
369
370 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
371 1
    def add_metadata(self, circuit_id):
372
        """Add metadata to an EVC."""
373 1
        try:
374 1
            metadata = request.get_json()
375 1
            content_type = request.content_type
376 1
        except BadRequest as error:
377 1
            result = "The request body is not a well-formed JSON."
378 1
            raise BadRequest(result) from error
379 1
        if content_type is None:
380 1
            result = "The request body is empty."
381 1
            raise BadRequest(result)
382 1
        if metadata is None:
383 1
            if content_type != "application/json":
384 1
                result = (
385
                    "The content type must be application/json "
386
                    f"(received {content_type})."
387
                )
388
            else:
389
                result = "Metadata is empty."
390 1
            raise UnsupportedMediaType(result)
391
392 1
        try:
393 1
            evc = self.circuits[circuit_id]
394 1
        except KeyError as error:
395 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
396
397 1
        evc.extend_metadata(metadata)
398 1
        evc.sync()
399 1
        return jsonify("Operation successful"), 201
400
401 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
402 1
    def delete_metadata(self, circuit_id, key):
403
        """Delete metadata from an EVC."""
404 1
        try:
405 1
            evc = self.circuits[circuit_id]
406 1
        except KeyError as error:
407 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
408
409 1
        evc.remove_metadata(key)
410 1
        evc.sync()
411 1
        return jsonify("Operation successful"), 200
412
413 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
414 1
    def redeploy(self, circuit_id):
415
        """Endpoint to force the redeployment of an EVC."""
416 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
417 1
        try:
418 1
            evc = self.circuits[circuit_id]
419 1
        except KeyError:
420 1
            result = f"circuit_id {circuit_id} not found"
421 1
            raise NotFound(result) from NotFound
422 1
        if evc.is_enabled():
423 1
            with evc.lock:
424 1
                evc.remove_current_flows()
425 1
                evc.deploy()
426 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
427 1
            status = 202
428
        else:
429 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
430 1
            status = 409
431
432 1
        return jsonify(result), status
433
434 1
    @rest("/v2/evc/schedule", methods=["GET"])
435 1
    def list_schedules(self):
436
        """Endpoint to return all schedules stored for all circuits.
437
438
        Return a JSON with the following template:
439
        [{"schedule_id": <schedule_id>,
440
         "circuit_id": <circuit_id>,
441
         "schedule": <schedule object>}]
442
        """
443 1
        log.debug("list_schedules /v2/evc/schedule")
444 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
445 1
        if not circuits:
446 1
            result = {}
447 1
            status = 200
448 1
            return jsonify(result), status
449
450 1
        result = []
451 1
        status = 200
452 1
        for circuit in circuits:
453 1
            circuit_scheduler = circuit.get("circuit_scheduler")
454 1
            if circuit_scheduler:
455 1
                for scheduler in circuit_scheduler:
456 1
                    value = {
457
                        "schedule_id": scheduler.get("id"),
458
                        "circuit_id": circuit.get("id"),
459
                        "schedule": scheduler,
460
                    }
461 1
                    result.append(value)
462
463 1
        log.debug("list_schedules result %s %s", result, status)
464 1
        return jsonify(result), status
465
466 1
    @rest("/v2/evc/schedule/", methods=["POST"])
467 1
    def create_schedule(self):
468
        """
469
        Create a new schedule for a given circuit.
470
471
        This service do no check if there are conflicts with another schedule.
472
        Payload example:
473
            {
474
              "circuit_id":"aa:bb:cc",
475
              "schedule": {
476
                "date": "2019-08-07T14:52:10.967Z",
477
                "interval": "string",
478
                "frequency": "1 * * * *",
479
                "action": "create"
480
              }
481
            }
482
        """
483 1
        log.debug("create_schedule /v2/evc/schedule/")
484
485 1
        json_data = self._json_from_request("create_schedule")
486 1
        try:
487 1
            circuit_id = json_data["circuit_id"]
488 1
        except TypeError:
489 1
            result = "The payload should have a dictionary."
490 1
            log.debug("create_schedule result %s %s", result, 400)
491 1
            raise BadRequest(result) from BadRequest
492 1
        except KeyError:
493 1
            result = "Missing circuit_id."
494 1
            log.debug("create_schedule result %s %s", result, 400)
495 1
            raise BadRequest(result) from BadRequest
496
497 1
        try:
498 1
            schedule_data = json_data["schedule"]
499 1
        except KeyError:
500 1
            result = "Missing schedule data."
501 1
            log.debug("create_schedule result %s %s", result, 400)
502 1
            raise BadRequest(result) from BadRequest
503
504
        # Get EVC from circuits buffer
505 1
        circuits = self._get_circuits_buffer()
506
507
        # get the circuit
508 1
        evc = circuits.get(circuit_id)
509
510
        # get the circuit
511 1
        if not evc:
512 1
            result = f"circuit_id {circuit_id} not found"
513 1
            log.debug("create_schedule result %s %s", result, 404)
514 1
            raise NotFound(result) from NotFound
515
        # Can not modify circuits deleted and archived
516 1
        if evc.archived:
517 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
518 1
            log.debug("create_schedule result %s %s", result, 403)
519 1
            raise Forbidden(result) from Forbidden
520
521
        # new schedule from dict
522 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
523
524
        # If there is no schedule, create the list
525 1
        if not evc.circuit_scheduler:
526 1
            evc.circuit_scheduler = []
527
528
        # Add the new schedule
529 1
        evc.circuit_scheduler.append(new_schedule)
530
531
        # Add schedule job
532 1
        self.sched.add_circuit_job(evc, new_schedule)
533
534
        # save circuit to mongodb
535 1
        evc.sync()
536
537 1
        result = new_schedule.as_dict()
538 1
        status = 201
539
540 1
        log.debug("create_schedule result %s %s", result, status)
541 1
        return jsonify(result), status
542
543 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
544 1
    def update_schedule(self, schedule_id):
545
        """Update a schedule.
546
547
        Change all attributes from the given schedule from a EVC circuit.
548
        The schedule ID is preserved as default.
549
        Payload example:
550
            {
551
              "date": "2019-08-07T14:52:10.967Z",
552
              "interval": "string",
553
              "frequency": "1 * * *",
554
              "action": "create"
555
            }
556
        """
557 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
558
559
        # Try to find a circuit schedule
560 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
561
562
        # Can not modify circuits deleted and archived
563 1
        if not found_schedule:
564 1
            result = f"schedule_id {schedule_id} not found"
565 1
            log.debug("update_schedule result %s %s", result, 404)
566 1
            raise NotFound(result) from NotFound
567 1
        if evc.archived:
568 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
569 1
            log.debug("update_schedule result %s %s", result, 403)
570 1
            raise Forbidden(result) from Forbidden
571
572 1
        data = self._json_from_request("update_schedule")
573
574 1
        new_schedule = CircuitSchedule.from_dict(data)
575 1
        new_schedule.id = found_schedule.id
576
        # Remove the old schedule
577 1
        evc.circuit_scheduler.remove(found_schedule)
578
        # Append the modified schedule
579 1
        evc.circuit_scheduler.append(new_schedule)
580
581
        # Cancel all schedule jobs
582 1
        self.sched.cancel_job(found_schedule.id)
583
        # Add the new circuit schedule
584 1
        self.sched.add_circuit_job(evc, new_schedule)
585
        # Save EVC to mongodb
586 1
        evc.sync()
587
588 1
        result = new_schedule.as_dict()
589 1
        status = 200
590
591 1
        log.debug("update_schedule result %s %s", result, status)
592 1
        return jsonify(result), status
593
594 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
595 1
    def delete_schedule(self, schedule_id):
596
        """Remove a circuit schedule.
597
598
        Remove the Schedule from EVC.
599
        Remove the Schedule from cron job.
600
        Save the EVC to the Storehouse.
601
        """
602 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
603 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
604
605
        # Can not modify circuits deleted and archived
606 1
        if not found_schedule:
607 1
            result = f"schedule_id {schedule_id} not found"
608 1
            log.debug("delete_schedule result %s %s", result, 404)
609 1
            raise NotFound(result)
610
611 1
        if evc.archived:
612 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
613 1
            log.debug("delete_schedule result %s %s", result, 403)
614 1
            raise Forbidden(result)
615
616
        # Remove the old schedule
617 1
        evc.circuit_scheduler.remove(found_schedule)
618
619
        # Cancel all schedule jobs
620 1
        self.sched.cancel_job(found_schedule.id)
621
        # Save EVC to mongodb
622 1
        evc.sync()
623
624 1
        result = "Schedule removed"
625 1
        status = 200
626
627 1
        log.debug("delete_schedule result %s %s", result, status)
628 1
        return jsonify(result), status
629
630 1
    def _is_duplicated_evc(self, evc):
631
        """Verify if the circuit given is duplicated with the stored evcs.
632
633
        Args:
634
            evc (EVC): circuit to be analysed.
635
636
        Returns:
637
            boolean: True if the circuit is duplicated, otherwise False.
638
639
        """
640 1
        for circuit in tuple(self.circuits.values()):
641 1
            if not circuit.archived and circuit.shares_uni(evc):
642 1
                return True
643 1
        return False
644
645 1
    @listen_to("kytos/topology.link_up")
646 1
    def on_link_up(self, event):
647
        """Change circuit when link is up or end_maintenance."""
648
        self.handle_link_up(event)
649
650 1
    def handle_link_up(self, event):
651
        """Change circuit when link is up or end_maintenance."""
652 1
        log.debug("Event handle_link_up %s", event)
653 1
        for evc in self.get_evcs_by_svc_level():
654 1
            if evc.is_enabled() and not evc.archived:
655 1
                with evc.lock:
656 1
                    evc.handle_link_up(event.content["link"])
657
658 1
    @listen_to("kytos/topology.link_down")
659 1
    def on_link_down(self, event):
660
        """Change circuit when link is down or under_mantenance."""
661
        self.handle_link_down(event)
662
663 1
    def handle_link_down(self, event):
664
        """Change circuit when link is down or under_mantenance."""
665 1
        link = event.content["link"]
666 1
        log.info("Event handle_link_down %s", link)
667 1
        switch_flows = {}
668 1
        evcs_with_failover = []
669 1
        evcs_normal = []
670 1
        check_failover = []
671 1
        for evc in self.get_evcs_by_svc_level():
672 1
            if evc.is_affected_by_link(link):
673
                # if there is no failover path, handles link down the
674
                # tradditional way
675 1
                if (
676
                    not getattr(evc, 'failover_path', None) or
677
                    evc.is_failover_path_affected_by_link(link)
678
                ):
679 1
                    evcs_normal.append(evc)
680 1
                    continue
681 1
                for dpid, flows in evc.get_failover_flows().items():
682 1
                    switch_flows.setdefault(dpid, [])
683 1
                    switch_flows[dpid].extend(flows)
684 1
                evcs_with_failover.append(evc)
685
            else:
686 1
                check_failover.append(evc)
687
688 1
        offset = 0
689 1
        while switch_flows:
690 1
            offset = (offset + settings.BATCH_SIZE) or None
691 1
            switches = list(switch_flows.keys())
692 1
            for dpid in switches:
693 1
                emit_event(
694
                    self.controller,
695
                    context="kytos.flow_manager",
696
                    name="flows.install",
697
                    dpid=dpid,
698
                    flow_dict={"flows": switch_flows[dpid][:offset]},
699
                )
700 1
                if offset is None or offset >= len(switch_flows[dpid]):
701 1
                    del switch_flows[dpid]
702 1
                    continue
703 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
704 1
            time.sleep(settings.BATCH_INTERVAL)
705
706 1
        for evc in evcs_with_failover:
707 1
            with evc.lock:
708 1
                old_path = evc.current_path
709 1
                evc.current_path = evc.failover_path
710 1
                evc.failover_path = old_path
711 1
                evc.sync()
712 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
713 1
            log.info(
714
                f"{evc} redeployed with failover due to link down {link.id}"
715
            )
716
717 1
        for evc in evcs_normal:
718 1
            emit_event(
719
                self.controller,
720
                "evc_affected_by_link_down",
721
                evc_id=evc.id,
722
                link_id=link.id,
723
            )
724
725
        # After handling the hot path, check if new failover paths are needed.
726
        # Note that EVCs affected by link down will generate a KytosEvent for
727
        # deployed|redeployed, which will trigger the failover path setup.
728
        # Thus, we just need to further check the check_failover list
729 1
        for evc in check_failover:
730 1
            if evc.is_failover_path_affected_by_link(link):
731 1
                evc.setup_failover_path()
732
733 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
734 1
    def on_evc_affected_by_link_down(self, event):
735
        """Change circuit when link is down or under_mantenance."""
736
        self.handle_evc_affected_by_link_down(event)
737
738 1
    def handle_evc_affected_by_link_down(self, event):
739
        """Change circuit when link is down or under_mantenance."""
740 1
        evc = self.circuits.get(event.content["evc_id"])
741 1
        link_id = event.content['link_id']
742 1
        if not evc:
743 1
            return
744 1
        with evc.lock:
745 1
            result = evc.handle_link_down()
746 1
        event_name = "error_redeploy_link_down"
747 1
        if result:
748 1
            log.info(f"{evc} redeployed due to link down {link_id}")
749 1
            event_name = "redeployed_link_down"
750 1
        emit_event(self.controller, event_name, evc_id=evc.id)
751
752 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
753 1
    def on_evc_deployed(self, event):
754
        """Handle EVC deployed|redeployed_link_down."""
755
        self.handle_evc_deployed(event)
756
757 1
    def handle_evc_deployed(self, event):
758
        """Setup failover path on evc deployed."""
759 1
        evc = self.circuits.get(event.content["evc_id"])
760 1
        if not evc:
761 1
            return
762 1
        with evc.lock:
763 1
            evc.setup_failover_path()
764
765 1
    @listen_to("kytos/topology.topology_loaded")
766 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
767
        """Load EVCs once the topology is available."""
768
        self.load_all_evcs()
769
770 1
    def load_all_evcs(self):
771
        """Try to load all EVCs on startup."""
772 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
773 1
        for circuit_id, circuit in circuits:
774 1
            if circuit_id not in self.circuits:
775 1
                self._load_evc(circuit)
776
777 1
    def _load_evc(self, circuit_dict):
778
        """Load one EVC from mongodb to memory."""
779 1
        try:
780 1
            evc = self._evc_from_dict(circuit_dict)
781 1
        except ValueError as exception:
782 1
            log.error(
783
                f"Could not load EVC: dict={circuit_dict} error={exception}"
784
            )
785 1
            return None
786
787 1
        if evc.archived:
788 1
            return None
789 1
        evc.deactivate()
790 1
        evc.sync()
791 1
        self.circuits.setdefault(evc.id, evc)
792 1
        self.sched.add(evc)
793 1
        return evc
794
795 1
    @listen_to("kytos/flow_manager.flow.error")
796 1
    def on_flow_mod_error(self, event):
797
        """Handle flow mod errors related to an EVC."""
798
        self.handle_flow_mod_error(event)
799
800 1
    def handle_flow_mod_error(self, event):
801
        """Handle flow mod errors related to an EVC."""
802 1
        flow = event.content["flow"]
803 1
        command = event.content.get("error_command")
804 1
        if command != "add":
805
            return
806 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
807 1
        if evc:
808 1
            evc.remove_current_flows()
809
810 1
    def _evc_dict_with_instances(self, evc_dict):
811
        """Convert some dict values to instance of EVC classes.
812
813
        This method will convert: [UNI, Link]
814
        """
815 1
        data = evc_dict.copy()  # Do not modify the original dict
816 1
        for attribute, value in data.items():
817
            # Get multiple attributes.
818
            # Ex: uni_a, uni_z
819 1
            if "uni" in attribute:
820 1
                try:
821 1
                    data[attribute] = self._uni_from_dict(value)
822 1
                except ValueError as exception:
823 1
                    result = "Error creating UNI: Invalid value"
824 1
                    raise ValueError(result) from exception
825
826 1
            if attribute == "circuit_scheduler":
827 1
                data[attribute] = []
828 1
                for schedule in value:
829 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
830
831
            # Get multiple attributes.
832
            # Ex: primary_links,
833
            #     backup_links,
834
            #     current_links_cache,
835
            #     primary_links_cache,
836
            #     backup_links_cache
837 1
            if "links" in attribute:
838 1
                data[attribute] = [
839
                    self._link_from_dict(link) for link in value
840
                ]
841
842
            # Ex: current_path,
843
            #     primary_path,
844
            #     backup_path
845 1
            if "path" in attribute and attribute != "dynamic_backup_path":
846 1
                data[attribute] = Path(
847
                    [self._link_from_dict(link) for link in value]
848
                )
849
850 1
        return data
851
852 1
    def _evc_from_dict(self, evc_dict):
853 1
        data = self._evc_dict_with_instances(evc_dict)
854 1
        return EVC(self.controller, **data)
855
856 1
    def _uni_from_dict(self, uni_dict):
857
        """Return a UNI object from python dict."""
858 1
        if uni_dict is None:
859 1
            return False
860
861 1
        interface_id = uni_dict.get("interface_id")
862 1
        interface = self.controller.get_interface_by_id(interface_id)
863 1
        if interface is None:
864 1
            result = (
865
                "Error creating UNI:"
866
                + f"Could not instantiate interface {interface_id}"
867
            )
868 1
            raise ValueError(result) from ValueError
869
870 1
        tag_dict = uni_dict.get("tag", None)
871 1
        if tag_dict:
872 1
            tag = TAG.from_dict(tag_dict)
873
        else:
874 1
            tag = None
875 1
        uni = UNI(interface, tag)
876
877 1
        return uni
878
879 1
    def _link_from_dict(self, link_dict):
880
        """Return a Link object from python dict."""
881 1
        id_a = link_dict.get("endpoint_a").get("id")
882 1
        id_b = link_dict.get("endpoint_b").get("id")
883
884 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
885 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
886
887 1
        link = Link(endpoint_a, endpoint_b)
888 1
        if "metadata" in link_dict:
889
            link.extend_metadata(link_dict.get("metadata"))
890
891 1
        s_vlan = link.get_metadata("s_vlan")
892 1
        if s_vlan:
893
            tag = TAG.from_dict(s_vlan)
894
            if tag is False:
895
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
896
                raise ValueError(error_msg)
897
            link.update_metadata("s_vlan", tag)
898 1
        return link
899
900 1
    def _find_evc_by_schedule_id(self, schedule_id):
901
        """
902
        Find an EVC and CircuitSchedule based on schedule_id.
903
904
        :param schedule_id: Schedule ID
905
        :return: EVC and Schedule
906
        """
907 1
        circuits = self._get_circuits_buffer()
908 1
        found_schedule = None
909 1
        evc = None
910
911
        # pylint: disable=unused-variable
912 1
        for c_id, circuit in circuits.items():
913 1
            for schedule in circuit.circuit_scheduler:
914 1
                if schedule.id == schedule_id:
915 1
                    found_schedule = schedule
916 1
                    evc = circuit
917 1
                    break
918 1
            if found_schedule:
919 1
                break
920 1
        return evc, found_schedule
921
922 1
    def _get_circuits_buffer(self):
923
        """
924
        Return the circuit buffer.
925
926
        If the buffer is empty, try to load data from mongodb.
927
        """
928 1
        if not self.circuits:
929
            # Load circuits from mongodb to buffer
930 1
            circuits = self.mongo_controller.get_circuits()['circuits']
931 1
            for c_id, circuit in circuits.items():
932 1
                evc = self._evc_from_dict(circuit)
933 1
                self.circuits[c_id] = evc
934 1
        return self.circuits
935
936 1
    @staticmethod
937 1
    def _json_from_request(caller):
938
        """Return a json from request.
939
940
        If it was not possible to get a json from the request, log, for debug,
941
        who was the caller and the error that ocurred, and raise an
942
        Exception.
943
        """
944 1
        try:
945 1
            json_data = request.get_json()
946
        except ValueError as exception:
947
            log.error(exception)
948
            log.debug(f"{caller} result {exception} 400")
949
            raise BadRequest(str(exception)) from BadRequest
950
        except BadRequest:
951
            result = "The request is not a valid JSON."
952
            log.debug(f"{caller} result {result} 400")
953
            raise BadRequest(result) from BadRequest
954 1
        if json_data is None:
955 1
            result = "Content-Type must be application/json"
956 1
            log.debug(f"{caller} result {result} 415")
957 1
            raise UnsupportedMediaType(result)
958
        return json_data
959