Passed
Pull Request — master (#258)
by
unknown
04:01
created

build.main.Main._evc_from_dict()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    @staticmethod
87 1
    def should_be_checked(circuit):
88
        "Verify if the circuit meets the necessary conditions to be checked"
89
        # pylint: disable=too-many-boolean-expressions
90 1
        if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
                # if a inter-switch EVC does not have current_path, it does not
97
                # make sense to run sdntrace on it
98
                and (circuit.is_intra_switch() or circuit.current_path)
99
                ):
100 1
            return True
101
        return False
102
103 1
    def execute_consistency(self):
104
        """Execute consistency routine."""
105 1
        circuits_to_check = []
106 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
107 1
        for circuit in self.get_evcs_by_svc_level():
108 1
            stored_circuits.pop(circuit.id, None)
109 1
            if self.should_be_checked(circuit):
110 1
                circuits_to_check.append(circuit)
111 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
112 1
        for circuit in circuits_to_check:
113 1
            is_checked = circuits_checked.get(circuit.id)
114 1
            if is_checked:
115 1
                circuit.execution_rounds = 0
116 1
                log.info(f"{circuit} enabled but inactive - activating")
117 1
                with circuit.lock:
118 1
                    circuit.activate()
119 1
                    circuit.sync()
120
            else:
121 1
                circuit.execution_rounds += 1
122 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
123 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
124 1
                    with circuit.lock:
125 1
                        circuit.deploy()
126 1
        for circuit_id in stored_circuits:
127 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
128 1
            self._load_evc(stored_circuits[circuit_id])
129
130 1
    def shutdown(self):
131
        """Execute when your napp is unloaded.
132
133
        If you have some cleanup procedure, insert it here.
134
        """
135
136 1
    @rest("/v2/evc/", methods=["GET"])
137 1
    def list_circuits(self):
138
        """Endpoint to return circuits stored.
139
140
        archive query arg if defined (not null) will be filtered
141
        accordingly, by default only non archived evcs will be listed
142
        """
143 1
        log.debug("list_circuits /v2/evc")
144 1
        archived = request.args.get("archived", "false").lower()
145 1
        archived_to_optional = {"null": None, "true": True, "false": False}
146 1
        archived = archived_to_optional.get(archived, False)
147 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
148 1
        circuits = circuits['circuits']
149 1
        return jsonify(circuits), 200
150
151 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
152 1
    def get_circuit(self, circuit_id):
153
        """Endpoint to return a circuit based on id."""
154 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
155 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
156 1
        if not circuit:
157 1
            result = f"circuit_id {circuit_id} not found"
158 1
            log.debug("get_circuit result %s %s", result, 404)
159 1
            raise NotFound(result)
160 1
        status = 200
161 1
        log.debug("get_circuit result %s %s", circuit, status)
162 1
        return jsonify(circuit), status
163
164 1
    @rest("/v2/evc/", methods=["POST"])
165 1
    @validate(spec)
166 1
    def create_circuit(self, data):
167
        """Try to create a new circuit.
168
169
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
170
        UNI_Z's requested C-VID are available from the interfaces' pools. This
171
        is checked when creating the UNI object.
172
173
        Then, E-Line NApp requests a primary and a backup path to the
174
        Pathfinder NApp using the attributes primary_links and backup_links
175
        submitted via REST
176
177
        # For each link composing paths in #3:
178
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
179
        #  - Using the S-VID obtained, generate abstract flow entries to be
180
        #    sent to FlowManager
181
182
        Push abstract flow entries to FlowManager and FlowManager pushes
183
        OpenFlow entries to datapaths
184
185
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
186
        creation
187
188
        Finnaly, notify user of the status of its request.
189
        """
190
        # Try to create the circuit object
191 1
        log.debug("create_circuit /v2/evc/")
192
193 1
        try:
194 1
            evc = self._evc_from_dict(data)
195 1
        except ValueError as exception:
196 1
            log.debug("create_circuit result %s %s", exception, 400)
197 1
            raise BadRequest(str(exception)) from BadRequest
198
199 1
        if evc.primary_path:
200
            try:
201
                evc.primary_path.is_valid(
202
                    evc.uni_a.interface.switch,
203
                    evc.uni_z.interface.switch,
204
                    bool(evc.circuit_scheduler),
205
                )
206
            except InvalidPath as exception:
207
                raise BadRequest(
208
                    f"primary_path is not valid: {exception}"
209
                ) from exception
210 1
        if evc.backup_path:
211
            try:
212
                evc.backup_path.is_valid(
213
                    evc.uni_a.interface.switch,
214
                    evc.uni_z.interface.switch,
215
                    bool(evc.circuit_scheduler),
216
                )
217
            except InvalidPath as exception:
218
                raise BadRequest(
219
                    f"backup_path is not valid: {exception}"
220
                ) from exception
221
222
        # verify duplicated evc
223 1
        if self._is_duplicated_evc(evc):
224 1
            result = "The EVC already exists."
225 1
            log.debug("create_circuit result %s %s", result, 409)
226 1
            raise Conflict(result)
227
228 1
        try:
229 1
            evc._validate_has_primary_or_dynamic()
230 1
        except ValueError as exception:
231 1
            raise BadRequest(str(exception)) from exception
232
233
        # save circuit
234 1
        try:
235 1
            evc.sync()
236
        except ValidationError as exception:
237
            raise BadRequest(str(exception)) from exception
238
239
        # store circuit in dictionary
240 1
        self.circuits[evc.id] = evc
241
242
        # Schedule the circuit deploy
243 1
        self.sched.add(evc)
244
245
        # Circuit has no schedule, deploy now
246 1
        if not evc.circuit_scheduler:
247 1
            with evc.lock:
248 1
                evc.deploy()
249
250
        # Notify users
251 1
        result = {"circuit_id": evc.id}
252 1
        status = 201
253 1
        log.debug("create_circuit result %s %s", result, status)
254 1
        emit_event(self.controller, name="created",
255
                   content=map_evc_event_content(evc))
256 1
        return jsonify(result), status
257
258 1
    @listen_to('kytos/flow_manager.flow.removed')
259 1
    def on_flow_delete(self, event):
260
        """Capture delete messages to keep track when flows got removed."""
261
        self.handle_flow_delete(event)
262
263 1
    def handle_flow_delete(self, event):
264
        """Keep track when the EVC got flows removed by deriving its cookie."""
265 1
        flow = event.content["flow"]
266 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
267 1
        if evc:
268 1
            log.debug("Flow removed in EVC %s", evc.id)
269 1
            evc.set_flow_removed_at()
270
271 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
272 1
    def update(self, circuit_id):
273
        """Update a circuit based on payload.
274
275
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
276
        """
277 1
        log.debug("update /v2/evc/%s", circuit_id)
278 1
        try:
279 1
            evc = self.circuits[circuit_id]
280 1
        except KeyError:
281 1
            result = f"circuit_id {circuit_id} not found"
282 1
            log.debug("update result %s %s", result, 404)
283 1
            raise NotFound(result) from NotFound
284
285 1
        if evc.archived:
286 1
            result = "Can't update archived EVC"
287 1
            log.debug("update result %s %s", result, 405)
288 1
            raise MethodNotAllowed(["GET"], result)
289
290 1
        try:
291 1
            data = request.get_json()
292 1
        except BadRequest:
293 1
            result = "The request body is not a well-formed JSON."
294 1
            log.debug("update result %s %s", result, 400)
295 1
            raise BadRequest(result) from BadRequest
296 1
        if data is None:
297 1
            result = "The request body mimetype is not application/json."
298 1
            log.debug("update result %s %s", result, 415)
299 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
300
301 1
        try:
302 1
            enable, redeploy = evc.update(
303
                **self._evc_dict_with_instances(data)
304
            )
305 1
        except ValueError as exception:
306 1
            log.error(exception)
307 1
            log.debug("update result %s %s", exception, 400)
308 1
            raise BadRequest(str(exception)) from BadRequest
309
310 1
        if evc.is_active():
311
            if enable is False:  # disable if active
312
                with evc.lock:
313
                    evc.remove()
314
            elif redeploy is not None:  # redeploy if active
315
                with evc.lock:
316
                    evc.remove()
317
                    evc.deploy()
318
        else:
319 1
            if enable is True:  # enable if inactive
320 1
                with evc.lock:
321 1
                    evc.deploy()
322 1
        result = {evc.id: evc.as_dict()}
323 1
        status = 200
324
325 1
        log.debug("update result %s %s", result, status)
326 1
        emit_event(self.controller, "updated",
327
                   content=map_evc_event_content(evc, **data))
328 1
        return jsonify(result), status
329
330 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
331 1
    def delete_circuit(self, circuit_id):
332
        """Remove a circuit.
333
334
        First, the flows are removed from the switches, and then the EVC is
335
        disabled.
336
        """
337 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
338 1
        try:
339 1
            evc = self.circuits[circuit_id]
340 1
        except KeyError:
341 1
            result = f"circuit_id {circuit_id} not found"
342 1
            log.debug("delete_circuit result %s %s", result, 404)
343 1
            raise NotFound(result) from NotFound
344
345 1
        if evc.archived:
346 1
            result = f"Circuit {circuit_id} already removed"
347 1
            log.debug("delete_circuit result %s %s", result, 404)
348 1
            raise NotFound(result) from NotFound
349
350 1
        log.info("Removing %s", evc)
351 1
        with evc.lock:
352 1
            evc.remove_current_flows()
353 1
            evc.remove_failover_flows(sync=False)
354 1
            evc.deactivate()
355 1
            evc.disable()
356 1
            self.sched.remove(evc)
357 1
            evc.archive()
358 1
            evc.sync()
359 1
        log.info("EVC removed. %s", evc)
360 1
        result = {"response": f"Circuit {circuit_id} removed"}
361 1
        status = 200
362
363 1
        log.debug("delete_circuit result %s %s", result, status)
364 1
        emit_event(self.controller, "deleted",
365
                   content=map_evc_event_content(evc))
366 1
        return jsonify(result), status
367
368 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
369 1
    def get_metadata(self, circuit_id):
370
        """Get metadata from an EVC."""
371 1
        try:
372 1
            return (
373
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
374
                200,
375
            )
376
        except KeyError as error:
377
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
378
379 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
380 1
    def add_metadata(self, circuit_id):
381
        """Add metadata to an EVC."""
382 1
        try:
383 1
            metadata = request.get_json()
384 1
            content_type = request.content_type
385 1
        except BadRequest as error:
386 1
            result = "The request body is not a well-formed JSON."
387 1
            raise BadRequest(result) from error
388 1
        if content_type is None:
389 1
            result = "The request body is empty."
390 1
            raise BadRequest(result)
391 1
        if metadata is None:
392 1
            if content_type != "application/json":
393 1
                result = (
394
                    "The content type must be application/json "
395
                    f"(received {content_type})."
396
                )
397
            else:
398
                result = "Metadata is empty."
399 1
            raise UnsupportedMediaType(result)
400
401 1
        try:
402 1
            evc = self.circuits[circuit_id]
403 1
        except KeyError as error:
404 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
405
406 1
        evc.extend_metadata(metadata)
407 1
        evc.sync()
408 1
        return jsonify("Operation successful"), 201
409
410 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
411 1
    def delete_metadata(self, circuit_id, key):
412
        """Delete metadata from an EVC."""
413 1
        try:
414 1
            evc = self.circuits[circuit_id]
415 1
        except KeyError as error:
416 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
417
418 1
        evc.remove_metadata(key)
419 1
        evc.sync()
420 1
        return jsonify("Operation successful"), 200
421
422 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
423 1
    def redeploy(self, circuit_id):
424
        """Endpoint to force the redeployment of an EVC."""
425 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
426 1
        try:
427 1
            evc = self.circuits[circuit_id]
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            raise NotFound(result) from NotFound
431 1
        if evc.is_enabled():
432 1
            with evc.lock:
433 1
                evc.remove_current_flows()
434 1
                evc.deploy()
435 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
436 1
            status = 202
437
        else:
438 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
439 1
            status = 409
440
441 1
        return jsonify(result), status
442
443 1
    @rest("/v2/evc/schedule", methods=["GET"])
444 1
    def list_schedules(self):
445
        """Endpoint to return all schedules stored for all circuits.
446
447
        Return a JSON with the following template:
448
        [{"schedule_id": <schedule_id>,
449
         "circuit_id": <circuit_id>,
450
         "schedule": <schedule object>}]
451
        """
452 1
        log.debug("list_schedules /v2/evc/schedule")
453 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
454 1
        if not circuits:
455 1
            result = {}
456 1
            status = 200
457 1
            return jsonify(result), status
458
459 1
        result = []
460 1
        status = 200
461 1
        for circuit in circuits:
462 1
            circuit_scheduler = circuit.get("circuit_scheduler")
463 1
            if circuit_scheduler:
464 1
                for scheduler in circuit_scheduler:
465 1
                    value = {
466
                        "schedule_id": scheduler.get("id"),
467
                        "circuit_id": circuit.get("id"),
468
                        "schedule": scheduler,
469
                    }
470 1
                    result.append(value)
471
472 1
        log.debug("list_schedules result %s %s", result, status)
473 1
        return jsonify(result), status
474
475 1
    @rest("/v2/evc/schedule/", methods=["POST"])
476 1
    def create_schedule(self):
477
        """
478
        Create a new schedule for a given circuit.
479
480
        This service do no check if there are conflicts with another schedule.
481
        Payload example:
482
            {
483
              "circuit_id":"aa:bb:cc",
484
              "schedule": {
485
                "date": "2019-08-07T14:52:10.967Z",
486
                "interval": "string",
487
                "frequency": "1 * * * *",
488
                "action": "create"
489
              }
490
            }
491
        """
492 1
        log.debug("create_schedule /v2/evc/schedule/")
493
494 1
        json_data = self._json_from_request("create_schedule")
495 1
        try:
496 1
            circuit_id = json_data["circuit_id"]
497 1
        except TypeError:
498 1
            result = "The payload should have a dictionary."
499 1
            log.debug("create_schedule result %s %s", result, 400)
500 1
            raise BadRequest(result) from BadRequest
501 1
        except KeyError:
502 1
            result = "Missing circuit_id."
503 1
            log.debug("create_schedule result %s %s", result, 400)
504 1
            raise BadRequest(result) from BadRequest
505
506 1
        try:
507 1
            schedule_data = json_data["schedule"]
508 1
        except KeyError:
509 1
            result = "Missing schedule data."
510 1
            log.debug("create_schedule result %s %s", result, 400)
511 1
            raise BadRequest(result) from BadRequest
512
513
        # Get EVC from circuits buffer
514 1
        circuits = self._get_circuits_buffer()
515
516
        # get the circuit
517 1
        evc = circuits.get(circuit_id)
518
519
        # get the circuit
520 1
        if not evc:
521 1
            result = f"circuit_id {circuit_id} not found"
522 1
            log.debug("create_schedule result %s %s", result, 404)
523 1
            raise NotFound(result) from NotFound
524
        # Can not modify circuits deleted and archived
525 1
        if evc.archived:
526 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
527 1
            log.debug("create_schedule result %s %s", result, 403)
528 1
            raise Forbidden(result) from Forbidden
529
530
        # new schedule from dict
531 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
532
533
        # If there is no schedule, create the list
534 1
        if not evc.circuit_scheduler:
535 1
            evc.circuit_scheduler = []
536
537
        # Add the new schedule
538 1
        evc.circuit_scheduler.append(new_schedule)
539
540
        # Add schedule job
541 1
        self.sched.add_circuit_job(evc, new_schedule)
542
543
        # save circuit to mongodb
544 1
        evc.sync()
545
546 1
        result = new_schedule.as_dict()
547 1
        status = 201
548
549 1
        log.debug("create_schedule result %s %s", result, status)
550 1
        return jsonify(result), status
551
552 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
553 1
    def update_schedule(self, schedule_id):
554
        """Update a schedule.
555
556
        Change all attributes from the given schedule from a EVC circuit.
557
        The schedule ID is preserved as default.
558
        Payload example:
559
            {
560
              "date": "2019-08-07T14:52:10.967Z",
561
              "interval": "string",
562
              "frequency": "1 * * *",
563
              "action": "create"
564
            }
565
        """
566 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
567
568
        # Try to find a circuit schedule
569 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
570
571
        # Can not modify circuits deleted and archived
572 1
        if not found_schedule:
573 1
            result = f"schedule_id {schedule_id} not found"
574 1
            log.debug("update_schedule result %s %s", result, 404)
575 1
            raise NotFound(result) from NotFound
576 1
        if evc.archived:
577 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
578 1
            log.debug("update_schedule result %s %s", result, 403)
579 1
            raise Forbidden(result) from Forbidden
580
581 1
        data = self._json_from_request("update_schedule")
582
583 1
        new_schedule = CircuitSchedule.from_dict(data)
584 1
        new_schedule.id = found_schedule.id
585
        # Remove the old schedule
586 1
        evc.circuit_scheduler.remove(found_schedule)
587
        # Append the modified schedule
588 1
        evc.circuit_scheduler.append(new_schedule)
589
590
        # Cancel all schedule jobs
591 1
        self.sched.cancel_job(found_schedule.id)
592
        # Add the new circuit schedule
593 1
        self.sched.add_circuit_job(evc, new_schedule)
594
        # Save EVC to mongodb
595 1
        evc.sync()
596
597 1
        result = new_schedule.as_dict()
598 1
        status = 200
599
600 1
        log.debug("update_schedule result %s %s", result, status)
601 1
        return jsonify(result), status
602
603 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
604 1
    def delete_schedule(self, schedule_id):
605
        """Remove a circuit schedule.
606
607
        Remove the Schedule from EVC.
608
        Remove the Schedule from cron job.
609
        Save the EVC to the Storehouse.
610
        """
611 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
612 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
613
614
        # Can not modify circuits deleted and archived
615 1
        if not found_schedule:
616 1
            result = f"schedule_id {schedule_id} not found"
617 1
            log.debug("delete_schedule result %s %s", result, 404)
618 1
            raise NotFound(result)
619
620 1
        if evc.archived:
621 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
622 1
            log.debug("delete_schedule result %s %s", result, 403)
623 1
            raise Forbidden(result)
624
625
        # Remove the old schedule
626 1
        evc.circuit_scheduler.remove(found_schedule)
627
628
        # Cancel all schedule jobs
629 1
        self.sched.cancel_job(found_schedule.id)
630
        # Save EVC to mongodb
631 1
        evc.sync()
632
633 1
        result = "Schedule removed"
634 1
        status = 200
635
636 1
        log.debug("delete_schedule result %s %s", result, status)
637 1
        return jsonify(result), status
638
639 1
    def _is_duplicated_evc(self, evc):
640
        """Verify if the circuit given is duplicated with the stored evcs.
641
642
        Args:
643
            evc (EVC): circuit to be analysed.
644
645
        Returns:
646
            boolean: True if the circuit is duplicated, otherwise False.
647
648
        """
649 1
        for circuit in tuple(self.circuits.values()):
650 1
            if not circuit.archived and circuit.shares_uni(evc):
651 1
                return True
652 1
        return False
653
654 1
    @listen_to("kytos/topology.link_up")
655 1
    def on_link_up(self, event):
656
        """Change circuit when link is up or end_maintenance."""
657
        self.handle_link_up(event)
658
659 1
    def handle_link_up(self, event):
660
        """Change circuit when link is up or end_maintenance."""
661 1
        log.info("Event handle_link_up %s", event.content["link"])
662 1
        for evc in self.get_evcs_by_svc_level():
663 1
            if evc.is_enabled() and not evc.archived:
664 1
                with evc.lock:
665 1
                    evc.handle_link_up(event.content["link"])
666
667 1
    @listen_to("kytos/topology.link_down")
668 1
    def on_link_down(self, event):
669
        """Change circuit when link is down or under_mantenance."""
670
        self.handle_link_down(event)
671
672 1
    def handle_link_down(self, event):
673
        """Change circuit when link is down or under_mantenance."""
674 1
        link = event.content["link"]
675 1
        log.info("Event handle_link_down %s", link)
676 1
        switch_flows = {}
677 1
        evcs_with_failover = []
678 1
        evcs_normal = []
679 1
        check_failover = []
680 1
        for evc in self.get_evcs_by_svc_level():
681 1
            if evc.is_affected_by_link(link):
682
                # if there is no failover path, handles link down the
683
                # tradditional way
684 1
                if (
685
                    not getattr(evc, 'failover_path', None) or
686
                    evc.is_failover_path_affected_by_link(link)
687
                ):
688 1
                    evcs_normal.append(evc)
689 1
                    continue
690 1
                for dpid, flows in evc.get_failover_flows().items():
691 1
                    switch_flows.setdefault(dpid, [])
692 1
                    switch_flows[dpid].extend(flows)
693 1
                evcs_with_failover.append(evc)
694
            else:
695 1
                check_failover.append(evc)
696
697 1
        offset = 0
698 1
        while switch_flows:
699 1
            offset = (offset + settings.BATCH_SIZE) or None
700 1
            switches = list(switch_flows.keys())
701 1
            for dpid in switches:
702 1
                emit_event(
703
                    self.controller,
704
                    context="kytos.flow_manager",
705
                    name="flows.install",
706
                    content={
707
                        "dpid": dpid,
708
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
709
                    }
710
                )
711 1
                if offset is None or offset >= len(switch_flows[dpid]):
712 1
                    del switch_flows[dpid]
713 1
                    continue
714 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
715 1
            time.sleep(settings.BATCH_INTERVAL)
716
717 1
        for evc in evcs_with_failover:
718 1
            with evc.lock:
719 1
                old_path = evc.current_path
720 1
                evc.current_path = evc.failover_path
721 1
                evc.failover_path = old_path
722 1
                evc.sync()
723 1
            emit_event(self.controller, "redeployed_link_down",
724
                       content=map_evc_event_content(evc))
725 1
            log.info(
726
                f"{evc} redeployed with failover due to link down {link.id}"
727
            )
728
729 1
        for evc in evcs_normal:
730 1
            emit_event(
731
                self.controller,
732
                "evc_affected_by_link_down",
733
                content={"link_id": link.id} | map_evc_event_content(evc)
734
            )
735
736
        # After handling the hot path, check if new failover paths are needed.
737
        # Note that EVCs affected by link down will generate a KytosEvent for
738
        # deployed|redeployed, which will trigger the failover path setup.
739
        # Thus, we just need to further check the check_failover list
740 1
        for evc in check_failover:
741 1
            if evc.is_failover_path_affected_by_link(link):
742 1
                evc.setup_failover_path()
743
744 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
745 1
    def on_evc_affected_by_link_down(self, event):
746
        """Change circuit when link is down or under_mantenance."""
747
        self.handle_evc_affected_by_link_down(event)
748
749 1
    def handle_evc_affected_by_link_down(self, event):
750
        """Change circuit when link is down or under_mantenance."""
751 1
        evc = self.circuits.get(event.content["evc_id"])
752 1
        link_id = event.content['link_id']
753 1
        if not evc:
754 1
            return
755 1
        with evc.lock:
756 1
            result = evc.handle_link_down()
757 1
        event_name = "error_redeploy_link_down"
758 1
        if result:
759 1
            log.info(f"{evc} redeployed due to link down {link_id}")
760 1
            event_name = "redeployed_link_down"
761 1
        emit_event(self.controller, event_name,
762
                   content=map_evc_event_content(evc))
763
764 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
765 1
    def on_evc_deployed(self, event):
766
        """Handle EVC deployed|redeployed_link_down."""
767
        self.handle_evc_deployed(event)
768
769 1
    def handle_evc_deployed(self, event):
770
        """Setup failover path on evc deployed."""
771 1
        evc = self.circuits.get(event.content["evc_id"])
772 1
        if not evc:
773 1
            return
774 1
        with evc.lock:
775 1
            evc.setup_failover_path()
776
777 1
    @listen_to("kytos/topology.topology_loaded")
778 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
779
        """Load EVCs once the topology is available."""
780
        self.load_all_evcs()
781
782 1
    def load_all_evcs(self):
783
        """Try to load all EVCs on startup."""
784 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
785 1
        for circuit_id, circuit in circuits:
786 1
            if circuit_id not in self.circuits:
787 1
                self._load_evc(circuit)
788
789 1
    def _load_evc(self, circuit_dict):
790
        """Load one EVC from mongodb to memory."""
791 1
        try:
792 1
            evc = self._evc_from_dict(circuit_dict)
793 1
        except ValueError as exception:
794 1
            log.error(
795
                f"Could not load EVC: dict={circuit_dict} error={exception}"
796
            )
797 1
            return None
798
799 1
        if evc.archived:
800 1
            return None
801 1
        evc.deactivate()
802 1
        evc.sync()
803 1
        self.circuits.setdefault(evc.id, evc)
804 1
        self.sched.add(evc)
805 1
        return evc
806
807 1
    @listen_to("kytos/flow_manager.flow.error")
808 1
    def on_flow_mod_error(self, event):
809
        """Handle flow mod errors related to an EVC."""
810
        self.handle_flow_mod_error(event)
811
812 1
    def handle_flow_mod_error(self, event):
813
        """Handle flow mod errors related to an EVC."""
814 1
        flow = event.content["flow"]
815 1
        command = event.content.get("error_command")
816 1
        if command != "add":
817
            return
818 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
819 1
        if evc:
820 1
            evc.remove_current_flows()
821
822 1
    def _evc_dict_with_instances(self, evc_dict):
823
        """Convert some dict values to instance of EVC classes.
824
825
        This method will convert: [UNI, Link]
826
        """
827 1
        data = evc_dict.copy()  # Do not modify the original dict
828 1
        for attribute, value in data.items():
829
            # Get multiple attributes.
830
            # Ex: uni_a, uni_z
831 1
            if "uni" in attribute:
832 1
                try:
833 1
                    data[attribute] = self._uni_from_dict(value)
834 1
                except ValueError as exception:
835 1
                    result = "Error creating UNI: Invalid value"
836 1
                    raise ValueError(result) from exception
837
838 1
            if attribute == "circuit_scheduler":
839 1
                data[attribute] = []
840 1
                for schedule in value:
841 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
842
843
            # Get multiple attributes.
844
            # Ex: primary_links,
845
            #     backup_links,
846
            #     current_links_cache,
847
            #     primary_links_cache,
848
            #     backup_links_cache
849 1
            if "links" in attribute:
850 1
                data[attribute] = [
851
                    self._link_from_dict(link) for link in value
852
                ]
853
854
            # Ex: current_path,
855
            #     primary_path,
856
            #     backup_path
857 1
            if "path" in attribute and attribute != "dynamic_backup_path":
858 1
                data[attribute] = Path(
859
                    [self._link_from_dict(link) for link in value]
860
                )
861
862 1
        return data
863
864 1
    def _evc_from_dict(self, evc_dict):
865 1
        data = self._evc_dict_with_instances(evc_dict)
866 1
        return EVC(self.controller, **data)
867
868 1
    def _uni_from_dict(self, uni_dict):
869
        """Return a UNI object from python dict."""
870 1
        if uni_dict is None:
871 1
            return False
872
873 1
        interface_id = uni_dict.get("interface_id")
874 1
        interface = self.controller.get_interface_by_id(interface_id)
875 1
        if interface is None:
876 1
            result = (
877
                "Error creating UNI:"
878
                + f"Could not instantiate interface {interface_id}"
879
            )
880 1
            raise ValueError(result) from ValueError
881
882 1
        tag_dict = uni_dict.get("tag", None)
883 1
        if tag_dict:
884 1
            tag = TAG.from_dict(tag_dict)
885
        else:
886 1
            tag = None
887 1
        uni = UNI(interface, tag)
888
889 1
        return uni
890
891 1
    def _link_from_dict(self, link_dict):
892
        """Return a Link object from python dict."""
893 1
        id_a = link_dict.get("endpoint_a").get("id")
894 1
        id_b = link_dict.get("endpoint_b").get("id")
895
896 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
897 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
898 1
        if not endpoint_a:
899 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
900 1
            raise ValueError(error_msg)
901 1
        if not endpoint_b:
902
            error_msg = f"Could not get interface endpoint_b id {id_b}"
903
            raise ValueError(error_msg)
904
905 1
        link = Link(endpoint_a, endpoint_b)
906 1
        if "metadata" in link_dict:
907 1
            link.extend_metadata(link_dict.get("metadata"))
908
909 1
        s_vlan = link.get_metadata("s_vlan")
910 1
        if s_vlan:
911 1
            tag = TAG.from_dict(s_vlan)
912 1
            if tag is False:
913
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
914
                raise ValueError(error_msg)
915 1
            link.update_metadata("s_vlan", tag)
916 1
        return link
917
918 1
    def _find_evc_by_schedule_id(self, schedule_id):
919
        """
920
        Find an EVC and CircuitSchedule based on schedule_id.
921
922
        :param schedule_id: Schedule ID
923
        :return: EVC and Schedule
924
        """
925 1
        circuits = self._get_circuits_buffer()
926 1
        found_schedule = None
927 1
        evc = None
928
929
        # pylint: disable=unused-variable
930 1
        for c_id, circuit in circuits.items():
931 1
            for schedule in circuit.circuit_scheduler:
932 1
                if schedule.id == schedule_id:
933 1
                    found_schedule = schedule
934 1
                    evc = circuit
935 1
                    break
936 1
            if found_schedule:
937 1
                break
938 1
        return evc, found_schedule
939
940 1
    def _get_circuits_buffer(self):
941
        """
942
        Return the circuit buffer.
943
944
        If the buffer is empty, try to load data from mongodb.
945
        """
946 1
        if not self.circuits:
947
            # Load circuits from mongodb to buffer
948 1
            circuits = self.mongo_controller.get_circuits()['circuits']
949 1
            for c_id, circuit in circuits.items():
950 1
                evc = self._evc_from_dict(circuit)
951 1
                self.circuits[c_id] = evc
952 1
        return self.circuits
953
954 1
    @staticmethod
955 1
    def _json_from_request(caller):
956
        """Return a json from request.
957
958
        If it was not possible to get a json from the request, log, for debug,
959
        who was the caller and the error that ocurred, and raise an
960
        Exception.
961
        """
962 1
        try:
963 1
            json_data = request.get_json()
964 1
        except ValueError as exception:
965
            log.error(exception)
966
            log.debug(f"{caller} result {exception} 400")
967
            raise BadRequest(str(exception)) from BadRequest
968 1
        except BadRequest:
969 1
            result = "The request is not a valid JSON."
970 1
            log.debug(f"{caller} result {result} 400")
971 1
            raise BadRequest(result) from BadRequest
972 1
        if json_data is None:
973 1
            result = "Content-Type must be application/json"
974 1
            log.debug(f"{caller} result {result} 415")
975 1
            raise UnsupportedMediaType(result)
976
        return json_data
977