Passed
Pull Request — master (#280)
by
unknown
03:38
created

build.main.Main.execute_consistency()   C

Complexity

Conditions 9

Size

Total Lines 26
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 9

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 26
ccs 24
cts 24
cp 1
rs 6.6666
c 0
b 0
f 0
cc 9
nop 1
crap 9
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.helpers import listen_to
16 1
from kytos.core.interface import TAG, UNI
17 1
from kytos.core.link import Link
18 1
from napps.kytos.mef_eline import controllers, settings
19 1
from napps.kytos.mef_eline.exceptions import InvalidPath
20 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
21
                                          Path)
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
24
                                         map_evc_event_content, validate)
25
26
27
# pylint: disable=too-many-public-methods
28 1
class Main(KytosNApp):
29
    """Main class of amlight/mef_eline NApp.
30
31
    This class is the entry point for this napp.
32
    """
33
34 1
    spec = load_spec()
35
36 1
    def setup(self):
37
        """Replace the '__init__' method for the KytosNApp subclass.
38
39
        The setup method is automatically called by the controller when your
40
        application is loaded.
41
42
        So, if you have any setup routine, insert it here.
43
        """
44
        # object used to scheduler circuit events
45 1
        self.sched = Scheduler()
46
47
        # object to save and load circuits
48 1
        self.mongo_controller = self.get_eline_controller()
49 1
        self.mongo_controller.bootstrap_indexes()
50
51
        # set the controller that will manager the dynamic paths
52 1
        DynamicPathManager.set_controller(self.controller)
53
54
        # dictionary of EVCs created. It acts as a circuit buffer.
55
        # Every create/update/delete must be synced to mongodb.
56 1
        self.circuits = {}
57
58 1
        self._lock = Lock()
59 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
60
61 1
        self.load_all_evcs()
62
63 1
    def get_evcs_by_svc_level(self) -> list:
64
        """Get circuits sorted by desc service level and asc creation_time.
65
66
        In the future, as more ops are offloaded it should be get from the DB.
67
        """
68 1
        return sorted(self.circuits.values(),
69
                      key=lambda x: (-x.service_level, x.creation_time))
70
71 1
    @staticmethod
72 1
    def get_eline_controller():
73
        """Return the ELineController instance."""
74
        return controllers.ELineController()
75
76 1
    def execute(self):
77
        """Execute once when the napp is running."""
78 1
        if self._lock.locked():
79 1
            return
80 1
        log.debug("Starting consistency routine")
81 1
        with self._lock:
82 1
            self.execute_consistency()
83 1
        log.debug("Finished consistency routine")
84
85 1
    @staticmethod
86 1
    def should_be_checked(circuit):
87
        "Verify if the circuit meets the necessary conditions to be checked"
88
        # pylint: disable=too-many-boolean-expressions
89 1
        if (
90
                circuit.is_enabled()
91
                and not circuit.is_active()
92
                and not circuit.lock.locked()
93
                and not circuit.has_recent_removed_flow()
94
                and not circuit.is_recent_updated()
95
                # if a inter-switch EVC does not have current_path, it does not
96
                # make sense to run sdntrace on it
97
                and (circuit.is_intra_switch() or circuit.current_path)
98
                ):
99 1
            return True
100
        return False
101
102 1
    def execute_consistency(self):
103
        """Execute consistency routine."""
104 1
        circuits_to_check = []
105 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
106 1
        for circuit in self.get_evcs_by_svc_level():
107 1
            stored_circuits.pop(circuit.id, None)
108 1
            if self.should_be_checked(circuit):
109 1
                circuits_to_check.append(circuit)
110 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
111 1
        for circuit in circuits_to_check:
112 1
            is_checked = circuits_checked.get(circuit.id)
113 1
            if is_checked:
114 1
                circuit.execution_rounds = 0
115 1
                log.info(f"{circuit} enabled but inactive - activating")
116 1
                with circuit.lock:
117 1
                    circuit.activate()
118 1
                    circuit.sync()
119
            else:
120 1
                circuit.execution_rounds += 1
121 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
122 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
123 1
                    with circuit.lock:
124 1
                        circuit.deploy()
125 1
        for circuit_id in stored_circuits:
126 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
127 1
            self._load_evc(stored_circuits[circuit_id])
128
129 1
    def shutdown(self):
130
        """Execute when your napp is unloaded.
131
132
        If you have some cleanup procedure, insert it here.
133
        """
134
135 1
    @rest("/v2/evc/", methods=["GET"])
136 1
    def list_circuits(self):
137
        """Endpoint to return circuits stored.
138
139
        archive query arg if defined (not null) will be filtered
140
        accordingly, by default only non archived evcs will be listed
141
        """
142 1
        log.debug("list_circuits /v2/evc")
143 1
        archived = request.args.get("archived", "false").lower()
144 1
        archived_to_optional = {"null": None, "true": True, "false": False}
145 1
        archived = archived_to_optional.get(archived, False)
146 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
147 1
        circuits = circuits['circuits']
148 1
        return jsonify(circuits), 200
149
150 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
151 1
    def get_circuit(self, circuit_id):
152
        """Endpoint to return a circuit based on id."""
153 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
154 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
155 1
        if not circuit:
156 1
            result = f"circuit_id {circuit_id} not found"
157 1
            log.debug("get_circuit result %s %s", result, 404)
158 1
            raise NotFound(result)
159 1
        status = 200
160 1
        log.debug("get_circuit result %s %s", circuit, status)
161 1
        return jsonify(circuit), status
162
163 1
    @rest("/v2/evc/", methods=["POST"])
164 1
    @validate(spec)
165 1
    def create_circuit(self, data):
166
        """Try to create a new circuit.
167
168
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
169
        UNI_Z's requested C-VID are available from the interfaces' pools. This
170
        is checked when creating the UNI object.
171
172
        Then, E-Line NApp requests a primary and a backup path to the
173
        Pathfinder NApp using the attributes primary_links and backup_links
174
        submitted via REST
175
176
        # For each link composing paths in #3:
177
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
178
        #  - Using the S-VID obtained, generate abstract flow entries to be
179
        #    sent to FlowManager
180
181
        Push abstract flow entries to FlowManager and FlowManager pushes
182
        OpenFlow entries to datapaths
183
184
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
185
        creation
186
187
        Finnaly, notify user of the status of its request.
188
        """
189
        # Try to create the circuit object
190 1
        log.debug("create_circuit /v2/evc/")
191
192 1
        try:
193 1
            evc = self._evc_from_dict(data)
194 1
        except ValueError as exception:
195 1
            log.debug("create_circuit result %s %s", exception, 400)
196 1
            raise BadRequest(str(exception)) from BadRequest
197
198 1
        if evc.primary_path:
199
            try:
200
                evc.primary_path.is_valid(
201
                    evc.uni_a.interface.switch,
202
                    evc.uni_z.interface.switch,
203
                    bool(evc.circuit_scheduler),
204
                )
205
            except InvalidPath as exception:
206
                raise BadRequest(
207
                    f"primary_path is not valid: {exception}"
208
                ) from exception
209 1
        if evc.backup_path:
210
            try:
211
                evc.backup_path.is_valid(
212
                    evc.uni_a.interface.switch,
213
                    evc.uni_z.interface.switch,
214
                    bool(evc.circuit_scheduler),
215
                )
216
            except InvalidPath as exception:
217
                raise BadRequest(
218
                    f"backup_path is not valid: {exception}"
219
                ) from exception
220
221
        # verify duplicated evc
222 1
        if self._is_duplicated_evc(evc):
223 1
            result = "The EVC already exists."
224 1
            log.debug("create_circuit result %s %s", result, 409)
225 1
            raise Conflict(result)
226
227 1
        try:
228 1
            evc._validate_has_primary_or_dynamic()
229 1
        except ValueError as exception:
230 1
            raise BadRequest(str(exception)) from exception
231
232
        # store circuit in dictionary
233 1
        self.circuits[evc.id] = evc
234
235
        # save circuit
236 1
        evc.sync()
237
238
        # Schedule the circuit deploy
239 1
        self.sched.add(evc)
240
241
        # Circuit has no schedule, deploy now
242 1
        if not evc.circuit_scheduler:
243 1
            with evc.lock:
244 1
                evc.deploy()
245
246
        # Notify users
247 1
        result = {"circuit_id": evc.id}
248 1
        status = 201
249 1
        log.debug("create_circuit result %s %s", result, status)
250 1
        emit_event(self.controller, name="created",
251
                   content=map_evc_event_content(evc))
252 1
        return jsonify(result), status
253
254 1
    @listen_to('kytos/flow_manager.flow.removed')
255 1
    def on_flow_delete(self, event):
256
        """Capture delete messages to keep track when flows got removed."""
257
        self.handle_flow_delete(event)
258
259 1
    def handle_flow_delete(self, event):
260
        """Keep track when the EVC got flows removed by deriving its cookie."""
261 1
        flow = event.content["flow"]
262 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
263 1
        if evc:
264 1
            log.debug("Flow removed in EVC %s", evc.id)
265 1
            evc.set_flow_removed_at()
266
267 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
268 1
    def update(self, circuit_id):
269
        """Update a circuit based on payload.
270
271
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
272
        """
273 1
        log.debug("update /v2/evc/%s", circuit_id)
274 1
        try:
275 1
            evc = self.circuits[circuit_id]
276 1
        except KeyError:
277 1
            result = f"circuit_id {circuit_id} not found"
278 1
            log.debug("update result %s %s", result, 404)
279 1
            raise NotFound(result) from NotFound
280
281 1
        if evc.archived:
282 1
            result = "Can't update archived EVC"
283 1
            log.debug("update result %s %s", result, 405)
284 1
            raise MethodNotAllowed(["GET"], result)
285
286 1
        try:
287 1
            data = request.get_json()
288 1
        except BadRequest:
289 1
            result = "The request body is not a well-formed JSON."
290 1
            log.debug("update result %s %s", result, 400)
291 1
            raise BadRequest(result) from BadRequest
292 1
        if data is None:
293 1
            result = "The request body mimetype is not application/json."
294 1
            log.debug("update result %s %s", result, 415)
295 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
296
297 1
        try:
298 1
            enable, redeploy = evc.update(
299
                **self._evc_dict_with_instances(data)
300
            )
301 1
        except ValueError as exception:
302 1
            log.error(exception)
303 1
            log.debug("update result %s %s", exception, 400)
304 1
            raise BadRequest(str(exception)) from BadRequest
305
306 1
        if evc.is_active():
307
            if enable is False:  # disable if active
308
                with evc.lock:
309
                    evc.remove()
310
            elif redeploy is not None:  # redeploy if active
311
                with evc.lock:
312
                    evc.remove()
313
                    evc.deploy()
314
        else:
315 1
            if enable is True:  # enable if inactive
316 1
                with evc.lock:
317 1
                    evc.deploy()
318 1
        result = {evc.id: evc.as_dict()}
319 1
        status = 200
320
321 1
        log.debug("update result %s %s", result, status)
322 1
        emit_event(self.controller, "updated",
323
                   content=map_evc_event_content(evc, **data))
324 1
        return jsonify(result), status
325
326 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
327 1
    def delete_circuit(self, circuit_id):
328
        """Remove a circuit.
329
330
        First, the flows are removed from the switches, and then the EVC is
331
        disabled.
332
        """
333 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
334 1
        try:
335 1
            evc = self.circuits[circuit_id]
336 1
        except KeyError:
337 1
            result = f"circuit_id {circuit_id} not found"
338 1
            log.debug("delete_circuit result %s %s", result, 404)
339 1
            raise NotFound(result) from NotFound
340
341 1
        if evc.archived:
342 1
            result = f"Circuit {circuit_id} already removed"
343 1
            log.debug("delete_circuit result %s %s", result, 404)
344 1
            raise NotFound(result) from NotFound
345
346 1
        log.info("Removing %s", evc)
347 1
        with evc.lock:
348 1
            evc.remove_current_flows()
349 1
            evc.remove_failover_flows(sync=False)
350 1
            evc.deactivate()
351 1
            evc.disable()
352 1
            self.sched.remove(evc)
353 1
            evc.archive()
354 1
            evc.sync()
355 1
        log.info("EVC removed. %s", evc)
356 1
        result = {"response": f"Circuit {circuit_id} removed"}
357 1
        status = 200
358
359 1
        log.debug("delete_circuit result %s %s", result, status)
360 1
        emit_event(self.controller, "deleted",
361
                   content=map_evc_event_content(evc))
362 1
        return jsonify(result), status
363
364 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
365 1
    def get_metadata(self, circuit_id):
366
        """Get metadata from an EVC."""
367 1
        try:
368 1
            return (
369
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
370
                200,
371
            )
372
        except KeyError as error:
373
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
374
375 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
376 1
    def add_metadata(self, circuit_id):
377
        """Add metadata to an EVC."""
378 1
        try:
379 1
            metadata = request.get_json()
380 1
            content_type = request.content_type
381 1
        except BadRequest as error:
382 1
            result = "The request body is not a well-formed JSON."
383 1
            raise BadRequest(result) from error
384 1
        if content_type is None:
385 1
            result = "The request body is empty."
386 1
            raise BadRequest(result)
387 1
        if metadata is None:
388 1
            if content_type != "application/json":
389 1
                result = (
390
                    "The content type must be application/json "
391
                    f"(received {content_type})."
392
                )
393
            else:
394
                result = "Metadata is empty."
395 1
            raise UnsupportedMediaType(result)
396
397 1
        try:
398 1
            evc = self.circuits[circuit_id]
399 1
        except KeyError as error:
400 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
401
402 1
        evc.extend_metadata(metadata)
403 1
        evc.sync()
404 1
        return jsonify("Operation successful"), 201
405
406 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
407 1
    def delete_metadata(self, circuit_id, key):
408
        """Delete metadata from an EVC."""
409 1
        try:
410 1
            evc = self.circuits[circuit_id]
411 1
        except KeyError as error:
412 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
413
414 1
        evc.remove_metadata(key)
415 1
        evc.sync()
416 1
        return jsonify("Operation successful"), 200
417
418 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
419 1
    def redeploy(self, circuit_id):
420
        """Endpoint to force the redeployment of an EVC."""
421 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
422 1
        try:
423 1
            evc = self.circuits[circuit_id]
424 1
        except KeyError:
425 1
            result = f"circuit_id {circuit_id} not found"
426 1
            raise NotFound(result) from NotFound
427 1
        if evc.is_enabled():
428 1
            with evc.lock:
429 1
                evc.remove_current_flows()
430 1
                evc.deploy()
431 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
432 1
            status = 202
433
        else:
434 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
435 1
            status = 409
436
437 1
        return jsonify(result), status
438
439 1
    @rest("/v2/evc/schedule", methods=["GET"])
440 1
    def list_schedules(self):
441
        """Endpoint to return all schedules stored for all circuits.
442
443
        Return a JSON with the following template:
444
        [{"schedule_id": <schedule_id>,
445
         "circuit_id": <circuit_id>,
446
         "schedule": <schedule object>}]
447
        """
448 1
        log.debug("list_schedules /v2/evc/schedule")
449 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
450 1
        if not circuits:
451 1
            result = {}
452 1
            status = 200
453 1
            return jsonify(result), status
454
455 1
        result = []
456 1
        status = 200
457 1
        for circuit in circuits:
458 1
            circuit_scheduler = circuit.get("circuit_scheduler")
459 1
            if circuit_scheduler:
460 1
                for scheduler in circuit_scheduler:
461 1
                    value = {
462
                        "schedule_id": scheduler.get("id"),
463
                        "circuit_id": circuit.get("id"),
464
                        "schedule": scheduler,
465
                    }
466 1
                    result.append(value)
467
468 1
        log.debug("list_schedules result %s %s", result, status)
469 1
        return jsonify(result), status
470
471 1
    @rest("/v2/evc/schedule/", methods=["POST"])
472 1
    def create_schedule(self):
473
        """
474
        Create a new schedule for a given circuit.
475
476
        This service do no check if there are conflicts with another schedule.
477
        Payload example:
478
            {
479
              "circuit_id":"aa:bb:cc",
480
              "schedule": {
481
                "date": "2019-08-07T14:52:10.967Z",
482
                "interval": "string",
483
                "frequency": "1 * * * *",
484
                "action": "create"
485
              }
486
            }
487
        """
488 1
        log.debug("create_schedule /v2/evc/schedule/")
489
490 1
        json_data = self._json_from_request("create_schedule")
491 1
        try:
492 1
            circuit_id = json_data["circuit_id"]
493 1
        except TypeError:
494 1
            result = "The payload should have a dictionary."
495 1
            log.debug("create_schedule result %s %s", result, 400)
496 1
            raise BadRequest(result) from BadRequest
497 1
        except KeyError:
498 1
            result = "Missing circuit_id."
499 1
            log.debug("create_schedule result %s %s", result, 400)
500 1
            raise BadRequest(result) from BadRequest
501
502 1
        try:
503 1
            schedule_data = json_data["schedule"]
504 1
        except KeyError:
505 1
            result = "Missing schedule data."
506 1
            log.debug("create_schedule result %s %s", result, 400)
507 1
            raise BadRequest(result) from BadRequest
508
509
        # Get EVC from circuits buffer
510 1
        circuits = self._get_circuits_buffer()
511
512
        # get the circuit
513 1
        evc = circuits.get(circuit_id)
514
515
        # get the circuit
516 1
        if not evc:
517 1
            result = f"circuit_id {circuit_id} not found"
518 1
            log.debug("create_schedule result %s %s", result, 404)
519 1
            raise NotFound(result) from NotFound
520
        # Can not modify circuits deleted and archived
521 1
        if evc.archived:
522 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
523 1
            log.debug("create_schedule result %s %s", result, 403)
524 1
            raise Forbidden(result) from Forbidden
525
526
        # new schedule from dict
527 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
528
529
        # If there is no schedule, create the list
530 1
        if not evc.circuit_scheduler:
531 1
            evc.circuit_scheduler = []
532
533
        # Add the new schedule
534 1
        evc.circuit_scheduler.append(new_schedule)
535
536
        # Add schedule job
537 1
        self.sched.add_circuit_job(evc, new_schedule)
538
539
        # save circuit to mongodb
540 1
        evc.sync()
541
542 1
        result = new_schedule.as_dict()
543 1
        status = 201
544
545 1
        log.debug("create_schedule result %s %s", result, status)
546 1
        return jsonify(result), status
547
548 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
549 1
    def update_schedule(self, schedule_id):
550
        """Update a schedule.
551
552
        Change all attributes from the given schedule from a EVC circuit.
553
        The schedule ID is preserved as default.
554
        Payload example:
555
            {
556
              "date": "2019-08-07T14:52:10.967Z",
557
              "interval": "string",
558
              "frequency": "1 * * *",
559
              "action": "create"
560
            }
561
        """
562 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
563
564
        # Try to find a circuit schedule
565 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
566
567
        # Can not modify circuits deleted and archived
568 1
        if not found_schedule:
569 1
            result = f"schedule_id {schedule_id} not found"
570 1
            log.debug("update_schedule result %s %s", result, 404)
571 1
            raise NotFound(result) from NotFound
572 1
        if evc.archived:
573 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
574 1
            log.debug("update_schedule result %s %s", result, 403)
575 1
            raise Forbidden(result) from Forbidden
576
577 1
        data = self._json_from_request("update_schedule")
578
579 1
        new_schedule = CircuitSchedule.from_dict(data)
580 1
        new_schedule.id = found_schedule.id
581
        # Remove the old schedule
582 1
        evc.circuit_scheduler.remove(found_schedule)
583
        # Append the modified schedule
584 1
        evc.circuit_scheduler.append(new_schedule)
585
586
        # Cancel all schedule jobs
587 1
        self.sched.cancel_job(found_schedule.id)
588
        # Add the new circuit schedule
589 1
        self.sched.add_circuit_job(evc, new_schedule)
590
        # Save EVC to mongodb
591 1
        evc.sync()
592
593 1
        result = new_schedule.as_dict()
594 1
        status = 200
595
596 1
        log.debug("update_schedule result %s %s", result, status)
597 1
        return jsonify(result), status
598
599 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
600 1
    def delete_schedule(self, schedule_id):
601
        """Remove a circuit schedule.
602
603
        Remove the Schedule from EVC.
604
        Remove the Schedule from cron job.
605
        Save the EVC to the Storehouse.
606
        """
607 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
608 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
609
610
        # Can not modify circuits deleted and archived
611 1
        if not found_schedule:
612 1
            result = f"schedule_id {schedule_id} not found"
613 1
            log.debug("delete_schedule result %s %s", result, 404)
614 1
            raise NotFound(result)
615
616 1
        if evc.archived:
617 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
618 1
            log.debug("delete_schedule result %s %s", result, 403)
619 1
            raise Forbidden(result)
620
621
        # Remove the old schedule
622 1
        evc.circuit_scheduler.remove(found_schedule)
623
624
        # Cancel all schedule jobs
625 1
        self.sched.cancel_job(found_schedule.id)
626
        # Save EVC to mongodb
627 1
        evc.sync()
628
629 1
        result = "Schedule removed"
630 1
        status = 200
631
632 1
        log.debug("delete_schedule result %s %s", result, status)
633 1
        return jsonify(result), status
634
635 1
    def _is_duplicated_evc(self, evc):
636
        """Verify if the circuit given is duplicated with the stored evcs.
637
638
        Args:
639
            evc (EVC): circuit to be analysed.
640
641
        Returns:
642
            boolean: True if the circuit is duplicated, otherwise False.
643
644
        """
645 1
        for circuit in tuple(self.circuits.values()):
646 1
            if not circuit.archived and circuit.shares_uni(evc):
647 1
                return True
648 1
        return False
649
650 1
    @listen_to("kytos/topology.link_up")
651 1
    def on_link_up(self, event):
652
        """Change circuit when link is up or end_maintenance."""
653
        self.handle_link_up(event)
654
655 1
    def handle_link_up(self, event):
656
        """Change circuit when link is up or end_maintenance."""
657 1
        log.info("Event handle_link_up %s", event.content["link"])
658 1
        for evc in self.get_evcs_by_svc_level():
659 1
            if evc.is_enabled() and not evc.archived:
660 1
                with evc.lock:
661 1
                    evc.handle_link_up(event.content["link"])
662
663 1
    @listen_to("kytos/topology.link_down")
664 1
    def on_link_down(self, event):
665
        """Change circuit when link is down or under_mantenance."""
666
        self.handle_link_down(event)
667
668 1
    def handle_link_down(self, event):
669
        """Change circuit when link is down or under_mantenance."""
670 1
        link = event.content["link"]
671 1
        log.info("Event handle_link_down %s", link)
672 1
        switch_flows = {}
673 1
        evcs_with_failover = []
674 1
        evcs_normal = []
675 1
        check_failover = []
676 1
        for evc in self.get_evcs_by_svc_level():
677 1
            if evc.is_affected_by_link(link):
678
                # if there is no failover path, handles link down the
679
                # tradditional way
680 1
                if (
681
                    not getattr(evc, 'failover_path', None) or
682
                    evc.is_failover_path_affected_by_link(link)
683
                ):
684 1
                    evcs_normal.append(evc)
685 1
                    continue
686 1
                for dpid, flows in evc.get_failover_flows().items():
687 1
                    switch_flows.setdefault(dpid, [])
688 1
                    switch_flows[dpid].extend(flows)
689 1
                evcs_with_failover.append(evc)
690
            else:
691 1
                check_failover.append(evc)
692
693 1
        offset = 0
694 1
        while switch_flows:
695 1
            offset = (offset + settings.BATCH_SIZE) or None
696 1
            switches = list(switch_flows.keys())
697 1
            for dpid in switches:
698 1
                emit_event(
699
                    self.controller,
700
                    context="kytos.flow_manager",
701
                    name="flows.install",
702
                    content={
703
                        "dpid": dpid,
704
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
705
                    }
706
                )
707 1
                if offset is None or offset >= len(switch_flows[dpid]):
708 1
                    del switch_flows[dpid]
709 1
                    continue
710 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
711 1
            time.sleep(settings.BATCH_INTERVAL)
712
713 1
        for evc in evcs_with_failover:
714 1
            with evc.lock:
715 1
                old_path = evc.current_path
716 1
                evc.current_path = evc.failover_path
717 1
                evc.failover_path = old_path
718 1
                evc.sync()
719 1
            emit_event(self.controller, "redeployed_link_down",
720
                       content=map_evc_event_content(evc))
721 1
            log.info(
722
                f"{evc} redeployed with failover due to link down {link.id}"
723
            )
724
725 1
        for evc in evcs_normal:
726 1
            emit_event(
727
                self.controller,
728
                "evc_affected_by_link_down",
729
                content={"link_id": link.id} | map_evc_event_content(evc)
730
            )
731
732
        # After handling the hot path, check if new failover paths are needed.
733
        # Note that EVCs affected by link down will generate a KytosEvent for
734
        # deployed|redeployed, which will trigger the failover path setup.
735
        # Thus, we just need to further check the check_failover list
736 1
        for evc in check_failover:
737 1
            if evc.is_failover_path_affected_by_link(link):
738 1
                evc.setup_failover_path()
739
740 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
741 1
    def on_evc_affected_by_link_down(self, event):
742
        """Change circuit when link is down or under_mantenance."""
743
        self.handle_evc_affected_by_link_down(event)
744
745 1
    def handle_evc_affected_by_link_down(self, event):
746
        """Change circuit when link is down or under_mantenance."""
747 1
        evc = self.circuits.get(event.content["evc_id"])
748 1
        link_id = event.content['link_id']
749 1
        if not evc:
750 1
            return
751 1
        with evc.lock:
752 1
            result = evc.handle_link_down()
753 1
        event_name = "error_redeploy_link_down"
754 1
        if result:
755 1
            log.info(f"{evc} redeployed due to link down {link_id}")
756 1
            event_name = "redeployed_link_down"
757 1
        emit_event(self.controller, event_name,
758
                   content=map_evc_event_content(evc))
759
760 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
761 1
    def on_evc_deployed(self, event):
762
        """Handle EVC deployed|redeployed_link_down."""
763
        self.handle_evc_deployed(event)
764
765 1
    def handle_evc_deployed(self, event):
766
        """Setup failover path on evc deployed."""
767 1
        evc = self.circuits.get(event.content["evc_id"])
768 1
        if not evc:
769 1
            return
770 1
        with evc.lock:
771 1
            evc.setup_failover_path()
772
773 1
    @listen_to("kytos/topology.topology_loaded")
774 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
775
        """Load EVCs once the topology is available."""
776
        self.load_all_evcs()
777
778 1
    def load_all_evcs(self):
779
        """Try to load all EVCs on startup."""
780 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
781 1
        for circuit_id, circuit in circuits:
782 1
            if circuit_id not in self.circuits:
783 1
                self._load_evc(circuit)
784
785 1
    def _load_evc(self, circuit_dict):
786
        """Load one EVC from mongodb to memory."""
787 1
        try:
788 1
            evc = self._evc_from_dict(circuit_dict)
789 1
        except ValueError as exception:
790 1
            log.error(
791
                f"Could not load EVC: dict={circuit_dict} error={exception}"
792
            )
793 1
            return None
794
795 1
        if evc.archived:
796 1
            return None
797 1
        evc.deactivate()
798 1
        evc.sync()
799 1
        self.circuits.setdefault(evc.id, evc)
800 1
        self.sched.add(evc)
801 1
        return evc
802
803 1
    @listen_to("kytos/flow_manager.flow.error")
804 1
    def on_flow_mod_error(self, event):
805
        """Handle flow mod errors related to an EVC."""
806
        self.handle_flow_mod_error(event)
807
808 1
    def handle_flow_mod_error(self, event):
809
        """Handle flow mod errors related to an EVC."""
810 1
        flow = event.content["flow"]
811 1
        command = event.content.get("error_command")
812 1
        if command != "add":
813
            return
814 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
815 1
        if evc:
816 1
            evc.remove_current_flows()
817
818 1
    def _evc_dict_with_instances(self, evc_dict):
819
        """Convert some dict values to instance of EVC classes.
820
821
        This method will convert: [UNI, Link]
822
        """
823 1
        data = evc_dict.copy()  # Do not modify the original dict
824 1
        for attribute, value in data.items():
825
            # Get multiple attributes.
826
            # Ex: uni_a, uni_z
827 1
            if "uni" in attribute:
828 1
                try:
829 1
                    data[attribute] = self._uni_from_dict(value)
830 1
                except ValueError as exception:
831 1
                    result = "Error creating UNI: Invalid value"
832 1
                    raise ValueError(result) from exception
833
834 1
            if attribute == "circuit_scheduler":
835 1
                data[attribute] = []
836 1
                for schedule in value:
837 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
838
839
            # Get multiple attributes.
840
            # Ex: primary_links,
841
            #     backup_links,
842
            #     current_links_cache,
843
            #     primary_links_cache,
844
            #     backup_links_cache
845 1
            if "links" in attribute:
846 1
                data[attribute] = [
847
                    self._link_from_dict(link) for link in value
848
                ]
849
850
            # Ex: current_path,
851
            #     primary_path,
852
            #     backup_path
853 1
            if "path" in attribute and attribute != "dynamic_backup_path":
854 1
                data[attribute] = Path(
855
                    [self._link_from_dict(link) for link in value]
856
                )
857
858 1
        return data
859
860 1
    def _evc_from_dict(self, evc_dict):
861 1
        data = self._evc_dict_with_instances(evc_dict)
862 1
        return EVC(self.controller, **data)
863
864 1
    def _uni_from_dict(self, uni_dict):
865
        """Return a UNI object from python dict."""
866 1
        if uni_dict is None:
867 1
            return False
868
869 1
        interface_id = uni_dict.get("interface_id")
870 1
        interface = self.controller.get_interface_by_id(interface_id)
871 1
        if interface is None:
872 1
            result = (
873
                "Error creating UNI:"
874
                + f"Could not instantiate interface {interface_id}"
875
            )
876 1
            raise ValueError(result) from ValueError
877
878 1
        tag_dict = uni_dict.get("tag", None)
879 1
        if tag_dict:
880 1
            tag = TAG.from_dict(tag_dict)
881
        else:
882 1
            tag = None
883 1
        uni = UNI(interface, tag)
884
885 1
        return uni
886
887 1
    def _link_from_dict(self, link_dict):
888
        """Return a Link object from python dict."""
889 1
        id_a = link_dict.get("endpoint_a").get("id")
890 1
        id_b = link_dict.get("endpoint_b").get("id")
891
892 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
893 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
894 1
        if not endpoint_a:
895 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
896 1
            raise ValueError(error_msg)
897 1
        if not endpoint_b:
898
            error_msg = f"Could not get interface endpoint_b id {id_b}"
899
            raise ValueError(error_msg)
900
901 1
        link = Link(endpoint_a, endpoint_b)
902 1
        if "metadata" in link_dict:
903 1
            link.extend_metadata(link_dict.get("metadata"))
904
905 1
        s_vlan = link.get_metadata("s_vlan")
906 1
        if s_vlan:
907 1
            tag = TAG.from_dict(s_vlan)
908 1
            if tag is False:
909
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
910
                raise ValueError(error_msg)
911 1
            link.update_metadata("s_vlan", tag)
912 1
        return link
913
914 1
    def _find_evc_by_schedule_id(self, schedule_id):
915
        """
916
        Find an EVC and CircuitSchedule based on schedule_id.
917
918
        :param schedule_id: Schedule ID
919
        :return: EVC and Schedule
920
        """
921 1
        circuits = self._get_circuits_buffer()
922 1
        found_schedule = None
923 1
        evc = None
924
925
        # pylint: disable=unused-variable
926 1
        for c_id, circuit in circuits.items():
927 1
            for schedule in circuit.circuit_scheduler:
928 1
                if schedule.id == schedule_id:
929 1
                    found_schedule = schedule
930 1
                    evc = circuit
931 1
                    break
932 1
            if found_schedule:
933 1
                break
934 1
        return evc, found_schedule
935
936 1
    def _get_circuits_buffer(self):
937
        """
938
        Return the circuit buffer.
939
940
        If the buffer is empty, try to load data from mongodb.
941
        """
942 1
        if not self.circuits:
943
            # Load circuits from mongodb to buffer
944 1
            circuits = self.mongo_controller.get_circuits()['circuits']
945 1
            for c_id, circuit in circuits.items():
946 1
                evc = self._evc_from_dict(circuit)
947 1
                self.circuits[c_id] = evc
948 1
        return self.circuits
949
950 1
    @staticmethod
951 1
    def _json_from_request(caller):
952
        """Return a json from request.
953
954
        If it was not possible to get a json from the request, log, for debug,
955
        who was the caller and the error that ocurred, and raise an
956
        Exception.
957
        """
958 1
        try:
959 1
            json_data = request.get_json()
960 1
        except ValueError as exception:
961
            log.error(exception)
962
            log.debug(f"{caller} result {exception} 400")
963
            raise BadRequest(str(exception)) from BadRequest
964 1
        except BadRequest:
965 1
            result = "The request is not a valid JSON."
966 1
            log.debug(f"{caller} result {result} 400")
967 1
            raise BadRequest(result) from BadRequest
968 1
        if json_data is None:
969 1
            result = "Content-Type must be application/json"
970 1
            log.debug(f"{caller} result {result} 415")
971 1
            raise UnsupportedMediaType(result)
972
        return json_data
973