Passed
Pull Request — master (#226)
by Italo Valcy
03:22
created

build.main.Main.list_schedules()   B

Complexity

Conditions 5

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 21
nop 1
dl 0
loc 31
ccs 18
cts 18
cp 1
crap 5
rs 8.9093
c 0
b 0
f 0
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
25
26
27
# pylint: disable=too-many-public-methods
28 1
class Main(KytosNApp):
29
    """Main class of amlight/mef_eline NApp.
30
31
    This class is the entry point for this napp.
32
    """
33
34 1
    spec = load_spec()
35
36 1
    def setup(self):
37
        """Replace the '__init__' method for the KytosNApp subclass.
38
39
        The setup method is automatically called by the controller when your
40
        application is loaded.
41
42
        So, if you have any setup routine, insert it here.
43
        """
44
        # object used to scheduler circuit events
45 1
        self.sched = Scheduler()
46
47
        # object to save and load circuits
48 1
        self.mongo_controller = self.get_eline_controller()
49 1
        self.mongo_controller.bootstrap_indexes()
50
51
        # set the controller that will manager the dynamic paths
52 1
        DynamicPathManager.set_controller(self.controller)
53
54
        # dictionary of EVCs created. It acts as a circuit buffer.
55
        # Every create/update/delete must be synced to mongodb.
56 1
        self.circuits = {}
57
58 1
        self._lock = Lock()
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self._lock.locked():
79 1
            return
80 1
        log.debug("Starting consistency routine")
81 1
        with self._lock:
82 1
            self.execute_consistency()
83 1
        log.debug("Finished consistency routine")
84
85 1
    def execute_consistency(self):
86
        """Execute consistency routine."""
87 1
        circuits_to_check = {}
88 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
89 1
        for circuit in self.get_evcs_by_svc_level():
90 1
            stored_circuits.pop(circuit.id, None)
91 1
            if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
            ):
98 1
                circuits_to_check[circuit.id] = circuit
99 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
100 1
        for circuit_id, is_checked in circuits_checked.items():
101 1
            circuit = circuits_to_check[circuit_id]
102 1
            if is_checked:
103 1
                circuit.execution_rounds = 0
104 1
                log.info(f"{circuit} enabled but inactive - activating")
105 1
                with circuit.lock:
106 1
                    circuit.activate()
107 1
                    circuit.sync()
108
            else:
109 1
                circuit.execution_rounds += 1
110 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
111 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
112 1
                    with circuit.lock:
113 1
                        circuit.deploy()
114 1
        for circuit_id in stored_circuits:
115 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
116 1
            self._load_evc(stored_circuits[circuit_id])
117
118 1
    def shutdown(self):
119
        """Execute when your napp is unloaded.
120
121
        If you have some cleanup procedure, insert it here.
122
        """
123
124 1
    @rest("/v2/evc/", methods=["GET"])
125 1
    def list_circuits(self):
126
        """Endpoint to return circuits stored.
127
128
        archive query arg if defined (not null) will be filtered
129
        accordingly, by default only non archived evcs will be listed
130
        """
131 1
        log.debug("list_circuits /v2/evc")
132 1
        archived = request.args.get("archived", "false").lower()
133 1
        archived_to_optional = {"null": None, "true": True, "false": False}
134 1
        archived = archived_to_optional.get(archived, False)
135 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
136 1
        circuits = circuits['circuits']
137 1
        return jsonify(circuits), 200
138
139 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
140 1
    def get_circuit(self, circuit_id):
141
        """Endpoint to return a circuit based on id."""
142 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
143 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
144 1
        if not circuit:
145 1
            result = f"circuit_id {circuit_id} not found"
146 1
            log.debug("get_circuit result %s %s", result, 404)
147 1
            raise NotFound(result)
148 1
        status = 200
149 1
        log.debug("get_circuit result %s %s", circuit, status)
150 1
        return jsonify(circuit), status
151
152 1
    @rest("/v2/evc/", methods=["POST"])
153 1
    @validate(spec)
154 1
    def create_circuit(self, data):
155
        """Try to create a new circuit.
156
157
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
158
        UNI_Z's requested C-VID are available from the interfaces' pools. This
159
        is checked when creating the UNI object.
160
161
        Then, E-Line NApp requests a primary and a backup path to the
162
        Pathfinder NApp using the attributes primary_links and backup_links
163
        submitted via REST
164
165
        # For each link composing paths in #3:
166
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
167
        #  - Using the S-VID obtained, generate abstract flow entries to be
168
        #    sent to FlowManager
169
170
        Push abstract flow entries to FlowManager and FlowManager pushes
171
        OpenFlow entries to datapaths
172
173
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
174
        creation
175
176
        Finnaly, notify user of the status of its request.
177
        """
178
        # Try to create the circuit object
179 1
        log.debug("create_circuit /v2/evc/")
180
181 1
        try:
182 1
            evc = self._evc_from_dict(data)
183 1
        except ValueError as exception:
184 1
            log.debug("create_circuit result %s %s", exception, 400)
185 1
            raise BadRequest(str(exception)) from BadRequest
186
187 1
        if evc.primary_path:
188
            try:
189
                evc.primary_path.is_valid(
190
                    evc.uni_a.interface.switch,
191
                    evc.uni_z.interface.switch,
192
                    bool(evc.circuit_scheduler),
193
                )
194
            except InvalidPath as exception:
195
                raise BadRequest(
196
                    f"primary_path is not valid: {exception}"
197
                ) from exception
198 1
        if evc.backup_path:
199
            try:
200
                evc.backup_path.is_valid(
201
                    evc.uni_a.interface.switch,
202
                    evc.uni_z.interface.switch,
203
                    bool(evc.circuit_scheduler),
204
                )
205
            except InvalidPath as exception:
206
                raise BadRequest(
207
                    f"backup_path is not valid: {exception}"
208
                ) from exception
209
210
        # verify duplicated evc
211 1
        if self._is_duplicated_evc(evc):
212 1
            result = "The EVC already exists."
213 1
            log.debug("create_circuit result %s %s", result, 409)
214 1
            raise Conflict(result)
215
216 1
        try:
217 1
            evc._validate_has_primary_or_dynamic()
218 1
        except ValueError as exception:
219 1
            raise BadRequest(str(exception)) from exception
220
221
        # store circuit in dictionary
222 1
        self.circuits[evc.id] = evc
223
224
        # save circuit
225 1
        evc.sync()
226
227
        # Schedule the circuit deploy
228 1
        self.sched.add(evc)
229
230
        # Circuit has no schedule, deploy now
231 1
        if not evc.circuit_scheduler:
232 1
            with evc.lock:
233 1
                evc.deploy()
234
235
        # Notify users
236 1
        event = KytosEvent(
237
            name="kytos.mef_eline.created", content=evc.as_dict()
238
        )
239 1
        self.controller.buffers.app.put(event)
240
241 1
        result = {"circuit_id": evc.id}
242 1
        status = 201
243 1
        log.debug("create_circuit result %s %s", result, status)
244 1
        emit_event(self.controller, "created", evc_id=evc.id)
245 1
        return jsonify(result), status
246
247 1
    @listen_to('kytos/flow_manager.flow.removed')
248 1
    def on_flow_delete(self, event):
249
        """Capture delete messages to keep track when flows got removed."""
250
        self.handle_flow_delete(event)
251
252 1
    def handle_flow_delete(self, event):
253
        """Keep track when the EVC got flows removed by deriving its cookie."""
254 1
        flow = event.content["flow"]
255 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
256 1
        if evc:
257 1
            log.debug("Flow removed in EVC %s", evc.id)
258 1
            evc.set_flow_removed_at()
259
260 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
261 1
    def update(self, circuit_id):
262
        """Update a circuit based on payload.
263
264
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
265
        """
266 1
        log.debug("update /v2/evc/%s", circuit_id)
267 1
        try:
268 1
            evc = self.circuits[circuit_id]
269 1
        except KeyError:
270 1
            result = f"circuit_id {circuit_id} not found"
271 1
            log.debug("update result %s %s", result, 404)
272 1
            raise NotFound(result) from NotFound
273
274 1
        if evc.archived:
275 1
            result = "Can't update archived EVC"
276 1
            log.debug("update result %s %s", result, 405)
277 1
            raise MethodNotAllowed(["GET"], result)
278
279 1
        try:
280 1
            data = request.get_json()
281 1
        except BadRequest:
282 1
            result = "The request body is not a well-formed JSON."
283 1
            log.debug("update result %s %s", result, 400)
284 1
            raise BadRequest(result) from BadRequest
285 1
        if data is None:
286 1
            result = "The request body mimetype is not application/json."
287 1
            log.debug("update result %s %s", result, 415)
288 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
289
290 1
        try:
291 1
            enable, redeploy = evc.update(
292
                **self._evc_dict_with_instances(data)
293
            )
294 1
        except ValueError as exception:
295 1
            log.error(exception)
296 1
            log.debug("update result %s %s", exception, 400)
297 1
            raise BadRequest(str(exception)) from BadRequest
298
299 1
        if evc.is_active():
300
            if enable is False:  # disable if active
301
                with evc.lock:
302
                    evc.remove()
303
            elif redeploy is not None:  # redeploy if active
304
                with evc.lock:
305
                    evc.remove()
306
                    evc.deploy()
307
        else:
308 1
            if enable is True:  # enable if inactive
309 1
                with evc.lock:
310 1
                    evc.deploy()
311 1
        result = {evc.id: evc.as_dict()}
312 1
        status = 200
313
314 1
        log.debug("update result %s %s", result, status)
315 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
316 1
        return jsonify(result), status
317
318 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
319 1
    def delete_circuit(self, circuit_id):
320
        """Remove a circuit.
321
322
        First, the flows are removed from the switches, and then the EVC is
323
        disabled.
324
        """
325 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
326 1
        try:
327 1
            evc = self.circuits[circuit_id]
328 1
        except KeyError:
329 1
            result = f"circuit_id {circuit_id} not found"
330 1
            log.debug("delete_circuit result %s %s", result, 404)
331 1
            raise NotFound(result) from NotFound
332
333 1
        if evc.archived:
334 1
            result = f"Circuit {circuit_id} already removed"
335 1
            log.debug("delete_circuit result %s %s", result, 404)
336 1
            raise NotFound(result) from NotFound
337
338 1
        log.info("Removing %s", evc)
339 1
        with evc.lock:
340 1
            evc.remove_current_flows()
341 1
            evc.remove_failover_flows(sync=False)
342 1
            evc.deactivate()
343 1
            evc.disable()
344 1
            self.sched.remove(evc)
345 1
            evc.archive()
346 1
            evc.sync()
347 1
        log.info("EVC removed. %s", evc)
348 1
        result = {"response": f"Circuit {circuit_id} removed"}
349 1
        status = 200
350
351 1
        log.debug("delete_circuit result %s %s", result, status)
352 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
353 1
        return jsonify(result), status
354
355 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
356 1
    def get_metadata(self, circuit_id):
357
        """Get metadata from an EVC."""
358 1
        try:
359 1
            return (
360
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
361
                200,
362
            )
363
        except KeyError as error:
364
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
365
366 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
367 1
    def add_metadata(self, circuit_id):
368
        """Add metadata to an EVC."""
369 1
        try:
370 1
            metadata = request.get_json()
371 1
            content_type = request.content_type
372 1
        except BadRequest as error:
373 1
            result = "The request body is not a well-formed JSON."
374 1
            raise BadRequest(result) from error
375 1
        if content_type is None:
376 1
            result = "The request body is empty."
377 1
            raise BadRequest(result)
378 1
        if metadata is None:
379 1
            if content_type != "application/json":
380 1
                result = (
381
                    "The content type must be application/json "
382
                    f"(received {content_type})."
383
                )
384
            else:
385
                result = "Metadata is empty."
386 1
            raise UnsupportedMediaType(result)
387
388 1
        try:
389 1
            evc = self.circuits[circuit_id]
390 1
        except KeyError as error:
391 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
392
393 1
        evc.extend_metadata(metadata)
394 1
        evc.sync()
395 1
        return jsonify("Operation successful"), 201
396
397 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
398 1
    def delete_metadata(self, circuit_id, key):
399
        """Delete metadata from an EVC."""
400 1
        try:
401 1
            evc = self.circuits[circuit_id]
402 1
        except KeyError as error:
403 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
404
405 1
        evc.remove_metadata(key)
406 1
        evc.sync()
407 1
        return jsonify("Operation successful"), 200
408
409 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
410 1
    def redeploy(self, circuit_id):
411
        """Endpoint to force the redeployment of an EVC."""
412 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
413 1
        try:
414 1
            evc = self.circuits[circuit_id]
415 1
        except KeyError:
416 1
            result = f"circuit_id {circuit_id} not found"
417 1
            raise NotFound(result) from NotFound
418 1
        if evc.is_enabled():
419 1
            with evc.lock:
420 1
                evc.remove_current_flows()
421 1
                evc.deploy()
422 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
423 1
            status = 202
424
        else:
425 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
426 1
            status = 409
427
428 1
        return jsonify(result), status
429
430 1
    @rest("/v2/evc/schedule", methods=["GET"])
431 1
    def list_schedules(self):
432
        """Endpoint to return all schedules stored for all circuits.
433
434
        Return a JSON with the following template:
435
        [{"schedule_id": <schedule_id>,
436
         "circuit_id": <circuit_id>,
437
         "schedule": <schedule object>}]
438
        """
439 1
        log.debug("list_schedules /v2/evc/schedule")
440 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
441 1
        if not circuits:
442 1
            result = {}
443 1
            status = 200
444 1
            return jsonify(result), status
445
446 1
        result = []
447 1
        status = 200
448 1
        for circuit in circuits:
449 1
            circuit_scheduler = circuit.get("circuit_scheduler")
450 1
            if circuit_scheduler:
451 1
                for scheduler in circuit_scheduler:
452 1
                    value = {
453
                        "schedule_id": scheduler.get("id"),
454
                        "circuit_id": circuit.get("id"),
455
                        "schedule": scheduler,
456
                    }
457 1
                    result.append(value)
458
459 1
        log.debug("list_schedules result %s %s", result, status)
460 1
        return jsonify(result), status
461
462 1
    @rest("/v2/evc/schedule/", methods=["POST"])
463 1
    def create_schedule(self):
464
        """
465
        Create a new schedule for a given circuit.
466
467
        This service do no check if there are conflicts with another schedule.
468
        Payload example:
469
            {
470
              "circuit_id":"aa:bb:cc",
471
              "schedule": {
472
                "date": "2019-08-07T14:52:10.967Z",
473
                "interval": "string",
474
                "frequency": "1 * * * *",
475
                "action": "create"
476
              }
477
            }
478
        """
479 1
        log.debug("create_schedule /v2/evc/schedule/")
480
481 1
        json_data = self._json_from_request("create_schedule")
482 1
        try:
483 1
            circuit_id = json_data["circuit_id"]
484 1
        except TypeError:
485 1
            result = "The payload should have a dictionary."
486 1
            log.debug("create_schedule result %s %s", result, 400)
487 1
            raise BadRequest(result) from BadRequest
488 1
        except KeyError:
489 1
            result = "Missing circuit_id."
490 1
            log.debug("create_schedule result %s %s", result, 400)
491 1
            raise BadRequest(result) from BadRequest
492
493 1
        try:
494 1
            schedule_data = json_data["schedule"]
495 1
        except KeyError:
496 1
            result = "Missing schedule data."
497 1
            log.debug("create_schedule result %s %s", result, 400)
498 1
            raise BadRequest(result) from BadRequest
499
500
        # Get EVC from circuits buffer
501 1
        circuits = self._get_circuits_buffer()
502
503
        # get the circuit
504 1
        evc = circuits.get(circuit_id)
505
506
        # get the circuit
507 1
        if not evc:
508 1
            result = f"circuit_id {circuit_id} not found"
509 1
            log.debug("create_schedule result %s %s", result, 404)
510 1
            raise NotFound(result) from NotFound
511
        # Can not modify circuits deleted and archived
512 1
        if evc.archived:
513 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
514 1
            log.debug("create_schedule result %s %s", result, 403)
515 1
            raise Forbidden(result) from Forbidden
516
517
        # new schedule from dict
518 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
519
520
        # If there is no schedule, create the list
521 1
        if not evc.circuit_scheduler:
522 1
            evc.circuit_scheduler = []
523
524
        # Add the new schedule
525 1
        evc.circuit_scheduler.append(new_schedule)
526
527
        # Add schedule job
528 1
        self.sched.add_circuit_job(evc, new_schedule)
529
530
        # save circuit to mongodb
531 1
        evc.sync()
532
533 1
        result = new_schedule.as_dict()
534 1
        status = 201
535
536 1
        log.debug("create_schedule result %s %s", result, status)
537 1
        return jsonify(result), status
538
539 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
540 1
    def update_schedule(self, schedule_id):
541
        """Update a schedule.
542
543
        Change all attributes from the given schedule from a EVC circuit.
544
        The schedule ID is preserved as default.
545
        Payload example:
546
            {
547
              "date": "2019-08-07T14:52:10.967Z",
548
              "interval": "string",
549
              "frequency": "1 * * *",
550
              "action": "create"
551
            }
552
        """
553 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
554
555
        # Try to find a circuit schedule
556 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
557
558
        # Can not modify circuits deleted and archived
559 1
        if not found_schedule:
560 1
            result = f"schedule_id {schedule_id} not found"
561 1
            log.debug("update_schedule result %s %s", result, 404)
562 1
            raise NotFound(result) from NotFound
563 1
        if evc.archived:
564 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
565 1
            log.debug("update_schedule result %s %s", result, 403)
566 1
            raise Forbidden(result) from Forbidden
567
568 1
        data = self._json_from_request("update_schedule")
569
570 1
        new_schedule = CircuitSchedule.from_dict(data)
571 1
        new_schedule.id = found_schedule.id
572
        # Remove the old schedule
573 1
        evc.circuit_scheduler.remove(found_schedule)
574
        # Append the modified schedule
575 1
        evc.circuit_scheduler.append(new_schedule)
576
577
        # Cancel all schedule jobs
578 1
        self.sched.cancel_job(found_schedule.id)
579
        # Add the new circuit schedule
580 1
        self.sched.add_circuit_job(evc, new_schedule)
581
        # Save EVC to mongodb
582 1
        evc.sync()
583
584 1
        result = new_schedule.as_dict()
585 1
        status = 200
586
587 1
        log.debug("update_schedule result %s %s", result, status)
588 1
        return jsonify(result), status
589
590 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
591 1
    def delete_schedule(self, schedule_id):
592
        """Remove a circuit schedule.
593
594
        Remove the Schedule from EVC.
595
        Remove the Schedule from cron job.
596
        Save the EVC to the Storehouse.
597
        """
598 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
599 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
600
601
        # Can not modify circuits deleted and archived
602 1
        if not found_schedule:
603 1
            result = f"schedule_id {schedule_id} not found"
604 1
            log.debug("delete_schedule result %s %s", result, 404)
605 1
            raise NotFound(result)
606
607 1
        if evc.archived:
608 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
609 1
            log.debug("delete_schedule result %s %s", result, 403)
610 1
            raise Forbidden(result)
611
612
        # Remove the old schedule
613 1
        evc.circuit_scheduler.remove(found_schedule)
614
615
        # Cancel all schedule jobs
616 1
        self.sched.cancel_job(found_schedule.id)
617
        # Save EVC to mongodb
618 1
        evc.sync()
619
620 1
        result = "Schedule removed"
621 1
        status = 200
622
623 1
        log.debug("delete_schedule result %s %s", result, status)
624 1
        return jsonify(result), status
625
626 1
    def _is_duplicated_evc(self, evc):
627
        """Verify if the circuit given is duplicated with the stored evcs.
628
629
        Args:
630
            evc (EVC): circuit to be analysed.
631
632
        Returns:
633
            boolean: True if the circuit is duplicated, otherwise False.
634
635
        """
636 1
        for circuit in tuple(self.circuits.values()):
637 1
            if not circuit.archived and circuit.shares_uni(evc):
638 1
                return True
639 1
        return False
640
641 1
    @listen_to("kytos/topology.link_up")
642 1
    def on_link_up(self, event):
643
        """Change circuit when link is up or end_maintenance."""
644
        self.handle_link_up(event)
645
646 1
    def handle_link_up(self, event):
647
        """Change circuit when link is up or end_maintenance."""
648 1
        log.debug("Event handle_link_up %s", event)
649 1
        for evc in self.get_evcs_by_svc_level():
650 1
            if evc.is_enabled() and not evc.archived:
651 1
                with evc.lock:
652 1
                    evc.handle_link_up(event.content["link"])
653
654 1
    @listen_to("kytos/topology.link_down")
655 1
    def on_link_down(self, event):
656
        """Change circuit when link is down or under_mantenance."""
657
        self.handle_link_down(event)
658
659 1
    def handle_link_down(self, event):
660
        """Change circuit when link is down or under_mantenance."""
661 1
        link = event.content["link"]
662 1
        log.info("Event handle_link_down %s", link)
663 1
        switch_flows = {}
664 1
        evcs_with_failover = []
665 1
        evcs_normal = []
666 1
        check_failover = []
667 1
        for evc in self.get_evcs_by_svc_level():
668 1
            if evc.is_affected_by_link(link):
669
                # if there is no failover path, handles link down the
670
                # tradditional way
671 1
                if (
672
                    not getattr(evc, 'failover_path', None) or
673
                    evc.is_failover_path_affected_by_link(link)
674
                ):
675 1
                    evcs_normal.append(evc)
676 1
                    continue
677 1
                for dpid, flows in evc.get_failover_flows().items():
678 1
                    switch_flows.setdefault(dpid, [])
679 1
                    switch_flows[dpid].extend(flows)
680 1
                evcs_with_failover.append(evc)
681
            else:
682 1
                check_failover.append(evc)
683
684 1
        offset = 0
685 1
        while switch_flows:
686 1
            offset = (offset + settings.BATCH_SIZE) or None
687 1
            switches = list(switch_flows.keys())
688 1
            for dpid in switches:
689 1
                emit_event(
690
                    self.controller,
691
                    context="kytos.flow_manager",
692
                    name="flows.install",
693
                    dpid=dpid,
694
                    flow_dict={"flows": switch_flows[dpid][:offset]},
695
                )
696 1
                if offset is None or offset >= len(switch_flows[dpid]):
697 1
                    del switch_flows[dpid]
698 1
                    continue
699 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
700 1
            time.sleep(settings.BATCH_INTERVAL)
701
702 1
        for evc in evcs_with_failover:
703 1
            with evc.lock:
704 1
                old_path = evc.current_path
705 1
                evc.current_path = evc.failover_path
706 1
                evc.failover_path = old_path
707 1
                evc.sync()
708 1
            emit_event(self.controller, "redeployed_link_down", evc_id=evc.id)
709 1
            log.info(
710
                f"{evc} redeployed with failover due to link down {link.id}"
711
            )
712
713 1
        for evc in evcs_normal:
714 1
            emit_event(
715
                self.controller,
716
                "evc_affected_by_link_down",
717
                evc_id=evc.id,
718
                link_id=link.id,
719
            )
720
721
        # After handling the hot path, check if new failover paths are needed.
722
        # Note that EVCs affected by link down will generate a KytosEvent for
723
        # deployed|redeployed, which will trigger the failover path setup.
724
        # Thus, we just need to further check the check_failover list
725 1
        for evc in check_failover:
726 1
            if evc.is_failover_path_affected_by_link(link):
727 1
                evc.setup_failover_path()
728
729 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
730 1
    def on_evc_affected_by_link_down(self, event):
731
        """Change circuit when link is down or under_mantenance."""
732
        self.handle_evc_affected_by_link_down(event)
733
734 1
    def handle_evc_affected_by_link_down(self, event):
735
        """Change circuit when link is down or under_mantenance."""
736 1
        evc = self.circuits.get(event.content["evc_id"])
737 1
        link_id = event.content['link_id']
738 1
        if not evc:
739 1
            return
740 1
        with evc.lock:
741 1
            result = evc.handle_link_down()
742 1
        event_name = "error_redeploy_link_down"
743 1
        if result:
744 1
            log.info(f"{evc} redeployed due to link down {link_id}")
745 1
            event_name = "redeployed_link_down"
746 1
        emit_event(self.controller, event_name, evc_id=evc.id)
747
748 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
749 1
    def on_evc_deployed(self, event):
750
        """Handle EVC deployed|redeployed_link_down."""
751
        self.handle_evc_deployed(event)
752
753 1
    def handle_evc_deployed(self, event):
754
        """Setup failover path on evc deployed."""
755 1
        evc = self.circuits.get(event.content["evc_id"])
756 1
        if not evc:
757 1
            return
758 1
        with evc.lock:
759 1
            evc.setup_failover_path()
760
761 1
    @listen_to("kytos/topology.topology_loaded")
762 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
763
        """Load EVCs once the topology is available."""
764
        self.load_all_evcs()
765
766 1
    def load_all_evcs(self):
767
        """Try to load all EVCs on startup."""
768 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
769 1
        for circuit_id, circuit in circuits:
770 1
            if circuit_id not in self.circuits:
771 1
                self._load_evc(circuit)
772
773 1
    def _load_evc(self, circuit_dict):
774
        """Load one EVC from mongodb to memory."""
775 1
        try:
776 1
            evc = self._evc_from_dict(circuit_dict)
777 1
        except ValueError as exception:
778 1
            log.error(
779
                f"Could not load EVC: dict={circuit_dict} error={exception}"
780
            )
781 1
            return None
782
783 1
        if evc.archived:
784 1
            return None
785 1
        evc.deactivate()
786 1
        evc.sync()
787 1
        self.circuits.setdefault(evc.id, evc)
788 1
        self.sched.add(evc)
789 1
        return evc
790
791 1
    @listen_to("kytos/flow_manager.flow.error")
792 1
    def on_flow_mod_error(self, event):
793
        """Handle flow mod errors related to an EVC."""
794
        self.handle_flow_mod_error(event)
795
796 1
    def handle_flow_mod_error(self, event):
797
        """Handle flow mod errors related to an EVC."""
798 1
        flow = event.content["flow"]
799 1
        command = event.content.get("error_command")
800 1
        if command != "add":
801
            return
802 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
803 1
        if evc:
804 1
            evc.remove_current_flows()
805
806 1
    def _evc_dict_with_instances(self, evc_dict):
807
        """Convert some dict values to instance of EVC classes.
808
809
        This method will convert: [UNI, Link]
810
        """
811 1
        data = evc_dict.copy()  # Do not modify the original dict
812 1
        for attribute, value in data.items():
813
            # Get multiple attributes.
814
            # Ex: uni_a, uni_z
815 1
            if "uni" in attribute:
816 1
                try:
817 1
                    data[attribute] = self._uni_from_dict(value)
818 1
                except ValueError as exception:
819 1
                    result = "Error creating UNI: Invalid value"
820 1
                    raise ValueError(result) from exception
821
822 1
            if attribute == "circuit_scheduler":
823 1
                data[attribute] = []
824 1
                for schedule in value:
825 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
826
827
            # Get multiple attributes.
828
            # Ex: primary_links,
829
            #     backup_links,
830
            #     current_links_cache,
831
            #     primary_links_cache,
832
            #     backup_links_cache
833 1
            if "links" in attribute:
834 1
                data[attribute] = [
835
                    self._link_from_dict(link) for link in value
836
                ]
837
838
            # Ex: current_path,
839
            #     primary_path,
840
            #     backup_path
841 1
            if "path" in attribute and attribute != "dynamic_backup_path":
842 1
                data[attribute] = Path(
843
                    [self._link_from_dict(link) for link in value]
844
                )
845
846 1
        return data
847
848 1
    def _evc_from_dict(self, evc_dict):
849 1
        data = self._evc_dict_with_instances(evc_dict)
850 1
        return EVC(self.controller, **data)
851
852 1
    def _uni_from_dict(self, uni_dict):
853
        """Return a UNI object from python dict."""
854 1
        if uni_dict is None:
855 1
            return False
856
857 1
        interface_id = uni_dict.get("interface_id")
858 1
        interface = self.controller.get_interface_by_id(interface_id)
859 1
        if interface is None:
860 1
            result = (
861
                "Error creating UNI:"
862
                + f"Could not instantiate interface {interface_id}"
863
            )
864 1
            raise ValueError(result) from ValueError
865
866 1
        tag_dict = uni_dict.get("tag", None)
867 1
        if tag_dict:
868 1
            tag = TAG.from_dict(tag_dict)
869
        else:
870 1
            tag = None
871 1
        uni = UNI(interface, tag)
872
873 1
        return uni
874
875 1
    def _link_from_dict(self, link_dict):
876
        """Return a Link object from python dict."""
877 1
        id_a = link_dict.get("endpoint_a").get("id")
878 1
        id_b = link_dict.get("endpoint_b").get("id")
879
880 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
881 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
882 1
        if not endpoint_a:
883
            error_msg = f"Could not get interface endpoint_a id {id_a}"
884
            raise ValueError(error_msg)
885 1
        if not endpoint_b:
886
            error_msg = f"Could not get interface endpoint_b id {id_b}"
887
            raise ValueError(error_msg)
888
889 1
        link = Link(endpoint_a, endpoint_b)
890 1
        if "metadata" in link_dict:
891
            link.extend_metadata(link_dict.get("metadata"))
892
893 1
        s_vlan = link.get_metadata("s_vlan")
894 1
        if s_vlan:
895
            tag = TAG.from_dict(s_vlan)
896
            if tag is False:
897
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
898
                raise ValueError(error_msg)
899
            link.update_metadata("s_vlan", tag)
900 1
        return link
901
902 1
    def _find_evc_by_schedule_id(self, schedule_id):
903
        """
904
        Find an EVC and CircuitSchedule based on schedule_id.
905
906
        :param schedule_id: Schedule ID
907
        :return: EVC and Schedule
908
        """
909 1
        circuits = self._get_circuits_buffer()
910 1
        found_schedule = None
911 1
        evc = None
912
913
        # pylint: disable=unused-variable
914 1
        for c_id, circuit in circuits.items():
915 1
            for schedule in circuit.circuit_scheduler:
916 1
                if schedule.id == schedule_id:
917 1
                    found_schedule = schedule
918 1
                    evc = circuit
919 1
                    break
920 1
            if found_schedule:
921 1
                break
922 1
        return evc, found_schedule
923
924 1
    def _get_circuits_buffer(self):
925
        """
926
        Return the circuit buffer.
927
928
        If the buffer is empty, try to load data from mongodb.
929
        """
930 1
        if not self.circuits:
931
            # Load circuits from mongodb to buffer
932 1
            circuits = self.mongo_controller.get_circuits()['circuits']
933 1
            for c_id, circuit in circuits.items():
934 1
                evc = self._evc_from_dict(circuit)
935 1
                self.circuits[c_id] = evc
936 1
        return self.circuits
937
938 1
    @staticmethod
939 1
    def _json_from_request(caller):
940
        """Return a json from request.
941
942
        If it was not possible to get a json from the request, log, for debug,
943
        who was the caller and the error that ocurred, and raise an
944
        Exception.
945
        """
946 1
        try:
947 1
            json_data = request.get_json()
948
        except ValueError as exception:
949
            log.error(exception)
950
            log.debug(f"{caller} result {exception} 400")
951
            raise BadRequest(str(exception)) from BadRequest
952
        except BadRequest:
953
            result = "The request is not a valid JSON."
954
            log.debug(f"{caller} result {result} 400")
955
            raise BadRequest(result) from BadRequest
956 1
        if json_data is None:
957 1
            result = "Content-Type must be application/json"
958 1
            log.debug(f"{caller} result {result} 415")
959 1
            raise UnsupportedMediaType(result)
960
        return json_data
961