Passed
Pull Request — master (#298)
by
unknown
03:36
created

build.main.Main.bulk_add_metadata()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 17

Duplication

Lines 20
Ratio 100 %

Code Coverage

Tests 17
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 17
nop 2
dl 20
loc 20
ccs 17
cts 17
cp 1
crap 5
rs 9.0833
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    @staticmethod
87 1
    def should_be_checked(circuit):
88
        "Verify if the circuit meets the necessary conditions to be checked"
89
        # pylint: disable=too-many-boolean-expressions
90 1
        if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
                # if a inter-switch EVC does not have current_path, it does not
97
                # make sense to run sdntrace on it
98
                and (circuit.is_intra_switch() or circuit.current_path)
99
                ):
100 1
            return True
101
        return False
102
103 1
    def execute_consistency(self):
104
        """Execute consistency routine."""
105 1
        circuits_to_check = []
106 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
107 1
        for circuit in self.get_evcs_by_svc_level():
108 1
            stored_circuits.pop(circuit.id, None)
109 1
            if self.should_be_checked(circuit):
110 1
                circuits_to_check.append(circuit)
111 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
112 1
        for circuit in circuits_to_check:
113 1
            is_checked = circuits_checked.get(circuit.id)
114 1
            if is_checked:
115 1
                circuit.execution_rounds = 0
116 1
                log.info(f"{circuit} enabled but inactive - activating")
117 1
                with circuit.lock:
118 1
                    circuit.activate()
119 1
                    circuit.sync()
120
            else:
121 1
                circuit.execution_rounds += 1
122 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
123 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
124 1
                    with circuit.lock:
125 1
                        circuit.deploy()
126 1
        for circuit_id in stored_circuits:
127 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
128 1
            self._load_evc(stored_circuits[circuit_id])
129
130 1
    def shutdown(self):
131
        """Execute when your napp is unloaded.
132
133
        If you have some cleanup procedure, insert it here.
134
        """
135
136 1
    @rest("/v2/evc/", methods=["GET"])
137 1
    def list_circuits(self):
138
        """Endpoint to return circuits stored.
139
140
        archive query arg if defined (not null) will be filtered
141
        accordingly, by default only non archived evcs will be listed
142
        """
143 1
        log.debug("list_circuits /v2/evc")
144 1
        args = request.args.to_dict()
145 1
        archived = args.pop("archived", "false")
146 1
        archived_to_optional = {"null": None, "true": True, "false": False}
147 1
        archived = archived_to_optional.get(archived, False)
148 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
149
                                                      metadata=args)
150 1
        circuits = circuits['circuits']
151 1
        return jsonify(circuits), 200
152
153 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
154 1
    def get_circuit(self, circuit_id):
155
        """Endpoint to return a circuit based on id."""
156 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
157 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
158 1
        if not circuit:
159 1
            result = f"circuit_id {circuit_id} not found"
160 1
            log.debug("get_circuit result %s %s", result, 404)
161 1
            raise NotFound(result)
162 1
        status = 200
163 1
        log.debug("get_circuit result %s %s", circuit, status)
164 1
        return jsonify(circuit), status
165
166 1
    @rest("/v2/evc/", methods=["POST"])
167 1
    @validate(spec)
168 1
    def create_circuit(self, data):
169
        """Try to create a new circuit.
170
171
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
172
        UNI_Z's requested C-VID are available from the interfaces' pools. This
173
        is checked when creating the UNI object.
174
175
        Then, E-Line NApp requests a primary and a backup path to the
176
        Pathfinder NApp using the attributes primary_links and backup_links
177
        submitted via REST
178
179
        # For each link composing paths in #3:
180
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
181
        #  - Using the S-VID obtained, generate abstract flow entries to be
182
        #    sent to FlowManager
183
184
        Push abstract flow entries to FlowManager and FlowManager pushes
185
        OpenFlow entries to datapaths
186
187
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
188
        creation
189
190
        Finnaly, notify user of the status of its request.
191
        """
192
        # Try to create the circuit object
193 1
        log.debug("create_circuit /v2/evc/")
194
195 1
        try:
196 1
            evc = self._evc_from_dict(data)
197 1
        except ValueError as exception:
198 1
            log.debug("create_circuit result %s %s", exception, 400)
199 1
            raise BadRequest(str(exception)) from BadRequest
200
201 1
        if evc.primary_path:
202
            try:
203
                evc.primary_path.is_valid(
204
                    evc.uni_a.interface.switch,
205
                    evc.uni_z.interface.switch,
206
                    bool(evc.circuit_scheduler),
207
                )
208
            except InvalidPath as exception:
209
                raise BadRequest(
210
                    f"primary_path is not valid: {exception}"
211
                ) from exception
212 1
        if evc.backup_path:
213
            try:
214
                evc.backup_path.is_valid(
215
                    evc.uni_a.interface.switch,
216
                    evc.uni_z.interface.switch,
217
                    bool(evc.circuit_scheduler),
218
                )
219
            except InvalidPath as exception:
220
                raise BadRequest(
221
                    f"backup_path is not valid: {exception}"
222
                ) from exception
223
224
        # verify duplicated evc
225 1
        if self._is_duplicated_evc(evc):
226 1
            result = "The EVC already exists."
227 1
            log.debug("create_circuit result %s %s", result, 409)
228 1
            raise Conflict(result)
229
230 1
        try:
231 1
            evc._validate_has_primary_or_dynamic()
232 1
        except ValueError as exception:
233 1
            raise BadRequest(str(exception)) from exception
234
235
        # save circuit
236 1
        try:
237 1
            evc.sync()
238
        except ValidationError as exception:
239
            raise BadRequest(str(exception)) from exception
240
241
        # store circuit in dictionary
242 1
        self.circuits[evc.id] = evc
243
244
        # Schedule the circuit deploy
245 1
        self.sched.add(evc)
246
247
        # Circuit has no schedule, deploy now
248 1
        if not evc.circuit_scheduler:
249 1
            with evc.lock:
250 1
                evc.deploy()
251
252
        # Notify users
253 1
        result = {"circuit_id": evc.id}
254 1
        status = 201
255 1
        log.debug("create_circuit result %s %s", result, status)
256 1
        emit_event(self.controller, name="created",
257
                   content=map_evc_event_content(evc))
258 1
        return jsonify(result), status
259
260 1
    @listen_to('kytos/flow_manager.flow.removed')
261 1
    def on_flow_delete(self, event):
262
        """Capture delete messages to keep track when flows got removed."""
263
        self.handle_flow_delete(event)
264
265 1
    def handle_flow_delete(self, event):
266
        """Keep track when the EVC got flows removed by deriving its cookie."""
267 1
        flow = event.content["flow"]
268 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
269 1
        if evc:
270 1
            log.debug("Flow removed in EVC %s", evc.id)
271 1
            evc.set_flow_removed_at()
272
273 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
274 1
    def update(self, circuit_id):
275
        """Update a circuit based on payload.
276
277
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
278
        """
279 1
        log.debug("update /v2/evc/%s", circuit_id)
280 1
        try:
281 1
            evc = self.circuits[circuit_id]
282 1
        except KeyError:
283 1
            result = f"circuit_id {circuit_id} not found"
284 1
            log.debug("update result %s %s", result, 404)
285 1
            raise NotFound(result) from NotFound
286
287 1
        if evc.archived:
288 1
            result = "Can't update archived EVC"
289 1
            log.debug("update result %s %s", result, 405)
290 1
            raise MethodNotAllowed(["GET"], result)
291
292 1
        try:
293 1
            data = request.get_json()
294 1
        except BadRequest:
295 1
            result = "The request body is not a well-formed JSON."
296 1
            log.debug("update result %s %s", result, 400)
297 1
            raise BadRequest(result) from BadRequest
298 1
        if data is None:
299 1
            result = "The request body mimetype is not application/json."
300 1
            log.debug("update result %s %s", result, 415)
301 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
302
303 1
        try:
304 1
            enable, redeploy = evc.update(
305
                **self._evc_dict_with_instances(data)
306
            )
307 1
        except ValidationError as exception:
308
            raise BadRequest(str(exception)) from exception
309 1
        except ValueError as exception:
310 1
            log.error(exception)
311 1
            log.debug("update result %s %s", exception, 400)
312 1
            raise BadRequest(str(exception)) from BadRequest
313
314 1
        if evc.is_active():
315
            if enable is False:  # disable if active
316
                with evc.lock:
317
                    evc.remove()
318
            elif redeploy is not None:  # redeploy if active
319
                with evc.lock:
320
                    evc.remove()
321
                    evc.deploy()
322
        else:
323 1
            if enable is True:  # enable if inactive
324 1
                with evc.lock:
325 1
                    evc.deploy()
326 1
        result = {evc.id: evc.as_dict()}
327 1
        status = 200
328
329 1
        log.debug("update result %s %s", result, status)
330 1
        emit_event(self.controller, "updated",
331
                   content=map_evc_event_content(evc, **data))
332 1
        return jsonify(result), status
333
334 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
335 1
    def delete_circuit(self, circuit_id):
336
        """Remove a circuit.
337
338
        First, the flows are removed from the switches, and then the EVC is
339
        disabled.
340
        """
341 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
342 1
        try:
343 1
            evc = self.circuits[circuit_id]
344 1
        except KeyError:
345 1
            result = f"circuit_id {circuit_id} not found"
346 1
            log.debug("delete_circuit result %s %s", result, 404)
347 1
            raise NotFound(result) from NotFound
348
349 1
        if evc.archived:
350 1
            result = f"Circuit {circuit_id} already removed"
351 1
            log.debug("delete_circuit result %s %s", result, 404)
352 1
            raise NotFound(result) from NotFound
353
354 1
        log.info("Removing %s", evc)
355 1
        with evc.lock:
356 1
            evc.remove_current_flows()
357 1
            evc.remove_failover_flows(sync=False)
358 1
            evc.deactivate()
359 1
            evc.disable()
360 1
            self.sched.remove(evc)
361 1
            evc.archive()
362 1
            evc.sync()
363 1
        log.info("EVC removed. %s", evc)
364 1
        result = {"response": f"Circuit {circuit_id} removed"}
365 1
        status = 200
366
367 1
        log.debug("delete_circuit result %s %s", result, status)
368 1
        emit_event(self.controller, "deleted",
369
                   content=map_evc_event_content(evc))
370 1
        return jsonify(result), status
371
372 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
373 1
    def get_metadata(self, circuit_id):
374
        """Get metadata from an EVC."""
375 1
        try:
376 1
            return (
377
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
378
                200,
379
            )
380
        except KeyError as error:
381
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
382
383 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
384 1
    @validate(spec)
385 1
    def bulk_add_metadata(self, data):
386
        """Add metadata to a bulk of EVCs."""
387 1
        circuit_ids = data.pop("circuit_ids", None)
388 1
        if circuit_ids is None:
389 1
            raise BadRequest("The key 'circuit_ids' is missing.")
390 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
391
392 1
        fail_evcs = []
393 1
        for _id in circuit_ids:
394 1
            try:
395 1
                evc = self.circuits[_id]
396 1
                evc.extend_metadata(data)
397 1
            except KeyError:
398 1
                fail_evcs.append(_id)
399
400 1
        if fail_evcs:
401 1
            return jsonify(fail_evcs), 404
402 1
        return jsonify("Operation successful"), 201
403
404 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
405 1
    def add_metadata(self, circuit_id):
406
        """Add metadata to an EVC."""
407 1
        try:
408 1
            metadata = request.get_json()
409 1
            content_type = request.content_type
410 1
        except BadRequest as error:
411 1
            result = "The request body is not a well-formed JSON."
412 1
            raise BadRequest(result) from error
413 1
        if content_type is None:
414 1
            result = "The request body is empty."
415 1
            raise BadRequest(result)
416 1
        if metadata is None:
417 1
            if content_type != "application/json":
418 1
                result = (
419
                    "The content type must be application/json "
420
                    f"(received {content_type})."
421
                )
422
            else:
423
                result = "Metadata is empty."
424 1
            raise UnsupportedMediaType(result)
425
426 1
        try:
427 1
            evc = self.circuits[circuit_id]
428 1
        except KeyError as error:
429 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
430
431 1
        evc.extend_metadata(metadata)
432 1
        evc.sync()
433 1
        return jsonify("Operation successful"), 201
434
435 1 View Code Duplication
    @rest("v2/evc/metadata/<key>", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
436 1
    @validate(spec)
437 1
    def bulk_delete_metadata(self, data, key):
438
        """Delete metada from a bulk of EVCs"""
439 1
        circuit_ids = data.pop("circuit_ids", None)
440 1
        if circuit_ids is None:
441
            raise BadRequest("The key 'circuit_ids' is missing.")
442 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
443
444 1
        fail_evcs = []
445 1
        for _id in circuit_ids:
446 1
            try:
447 1
                evc = self.circuits[_id]
448 1
                evc.remove_metadata(key)
449
            except KeyError:
450
                fail_evcs.append(_id)
451
452 1
        if fail_evcs:
453
            return jsonify(fail_evcs), 404
454 1
        return jsonify("Operation successful"), 200
455
456 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
457 1
    def delete_metadata(self, circuit_id, key):
458
        """Delete metadata from an EVC."""
459 1
        try:
460 1
            evc = self.circuits[circuit_id]
461 1
        except KeyError as error:
462 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
463
464 1
        evc.remove_metadata(key)
465 1
        evc.sync()
466 1
        return jsonify("Operation successful"), 200
467
468 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
469 1
    def redeploy(self, circuit_id):
470
        """Endpoint to force the redeployment of an EVC."""
471 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
472 1
        try:
473 1
            evc = self.circuits[circuit_id]
474 1
        except KeyError:
475 1
            result = f"circuit_id {circuit_id} not found"
476 1
            raise NotFound(result) from NotFound
477 1
        if evc.is_enabled():
478 1
            with evc.lock:
479 1
                evc.remove_current_flows()
480 1
                evc.deploy()
481 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
482 1
            status = 202
483
        else:
484 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
485 1
            status = 409
486
487 1
        return jsonify(result), status
488
489 1
    @rest("/v2/evc/schedule", methods=["GET"])
490 1
    def list_schedules(self):
491
        """Endpoint to return all schedules stored for all circuits.
492
493
        Return a JSON with the following template:
494
        [{"schedule_id": <schedule_id>,
495
         "circuit_id": <circuit_id>,
496
         "schedule": <schedule object>}]
497
        """
498 1
        log.debug("list_schedules /v2/evc/schedule")
499 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
500 1
        if not circuits:
501 1
            result = {}
502 1
            status = 200
503 1
            return jsonify(result), status
504
505 1
        result = []
506 1
        status = 200
507 1
        for circuit in circuits:
508 1
            circuit_scheduler = circuit.get("circuit_scheduler")
509 1
            if circuit_scheduler:
510 1
                for scheduler in circuit_scheduler:
511 1
                    value = {
512
                        "schedule_id": scheduler.get("id"),
513
                        "circuit_id": circuit.get("id"),
514
                        "schedule": scheduler,
515
                    }
516 1
                    result.append(value)
517
518 1
        log.debug("list_schedules result %s %s", result, status)
519 1
        return jsonify(result), status
520
521 1
    @rest("/v2/evc/schedule/", methods=["POST"])
522 1
    def create_schedule(self):
523
        """
524
        Create a new schedule for a given circuit.
525
526
        This service do no check if there are conflicts with another schedule.
527
        Payload example:
528
            {
529
              "circuit_id":"aa:bb:cc",
530
              "schedule": {
531
                "date": "2019-08-07T14:52:10.967Z",
532
                "interval": "string",
533
                "frequency": "1 * * * *",
534
                "action": "create"
535
              }
536
            }
537
        """
538 1
        log.debug("create_schedule /v2/evc/schedule/")
539
540 1
        json_data = self._json_from_request("create_schedule")
541 1
        try:
542 1
            circuit_id = json_data["circuit_id"]
543 1
        except TypeError:
544 1
            result = "The payload should have a dictionary."
545 1
            log.debug("create_schedule result %s %s", result, 400)
546 1
            raise BadRequest(result) from BadRequest
547 1
        except KeyError:
548 1
            result = "Missing circuit_id."
549 1
            log.debug("create_schedule result %s %s", result, 400)
550 1
            raise BadRequest(result) from BadRequest
551
552 1
        try:
553 1
            schedule_data = json_data["schedule"]
554 1
        except KeyError:
555 1
            result = "Missing schedule data."
556 1
            log.debug("create_schedule result %s %s", result, 400)
557 1
            raise BadRequest(result) from BadRequest
558
559
        # Get EVC from circuits buffer
560 1
        circuits = self._get_circuits_buffer()
561
562
        # get the circuit
563 1
        evc = circuits.get(circuit_id)
564
565
        # get the circuit
566 1
        if not evc:
567 1
            result = f"circuit_id {circuit_id} not found"
568 1
            log.debug("create_schedule result %s %s", result, 404)
569 1
            raise NotFound(result) from NotFound
570
        # Can not modify circuits deleted and archived
571 1
        if evc.archived:
572 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
573 1
            log.debug("create_schedule result %s %s", result, 403)
574 1
            raise Forbidden(result) from Forbidden
575
576
        # new schedule from dict
577 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
578
579
        # If there is no schedule, create the list
580 1
        if not evc.circuit_scheduler:
581 1
            evc.circuit_scheduler = []
582
583
        # Add the new schedule
584 1
        evc.circuit_scheduler.append(new_schedule)
585
586
        # Add schedule job
587 1
        self.sched.add_circuit_job(evc, new_schedule)
588
589
        # save circuit to mongodb
590 1
        evc.sync()
591
592 1
        result = new_schedule.as_dict()
593 1
        status = 201
594
595 1
        log.debug("create_schedule result %s %s", result, status)
596 1
        return jsonify(result), status
597
598 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
599 1
    def update_schedule(self, schedule_id):
600
        """Update a schedule.
601
602
        Change all attributes from the given schedule from a EVC circuit.
603
        The schedule ID is preserved as default.
604
        Payload example:
605
            {
606
              "date": "2019-08-07T14:52:10.967Z",
607
              "interval": "string",
608
              "frequency": "1 * * *",
609
              "action": "create"
610
            }
611
        """
612 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
613
614
        # Try to find a circuit schedule
615 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
616
617
        # Can not modify circuits deleted and archived
618 1
        if not found_schedule:
619 1
            result = f"schedule_id {schedule_id} not found"
620 1
            log.debug("update_schedule result %s %s", result, 404)
621 1
            raise NotFound(result) from NotFound
622 1
        if evc.archived:
623 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
624 1
            log.debug("update_schedule result %s %s", result, 403)
625 1
            raise Forbidden(result) from Forbidden
626
627 1
        data = self._json_from_request("update_schedule")
628
629 1
        new_schedule = CircuitSchedule.from_dict(data)
630 1
        new_schedule.id = found_schedule.id
631
        # Remove the old schedule
632 1
        evc.circuit_scheduler.remove(found_schedule)
633
        # Append the modified schedule
634 1
        evc.circuit_scheduler.append(new_schedule)
635
636
        # Cancel all schedule jobs
637 1
        self.sched.cancel_job(found_schedule.id)
638
        # Add the new circuit schedule
639 1
        self.sched.add_circuit_job(evc, new_schedule)
640
        # Save EVC to mongodb
641 1
        evc.sync()
642
643 1
        result = new_schedule.as_dict()
644 1
        status = 200
645
646 1
        log.debug("update_schedule result %s %s", result, status)
647 1
        return jsonify(result), status
648
649 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
650 1
    def delete_schedule(self, schedule_id):
651
        """Remove a circuit schedule.
652
653
        Remove the Schedule from EVC.
654
        Remove the Schedule from cron job.
655
        Save the EVC to the Storehouse.
656
        """
657 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
658 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
659
660
        # Can not modify circuits deleted and archived
661 1
        if not found_schedule:
662 1
            result = f"schedule_id {schedule_id} not found"
663 1
            log.debug("delete_schedule result %s %s", result, 404)
664 1
            raise NotFound(result)
665
666 1
        if evc.archived:
667 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
668 1
            log.debug("delete_schedule result %s %s", result, 403)
669 1
            raise Forbidden(result)
670
671
        # Remove the old schedule
672 1
        evc.circuit_scheduler.remove(found_schedule)
673
674
        # Cancel all schedule jobs
675 1
        self.sched.cancel_job(found_schedule.id)
676
        # Save EVC to mongodb
677 1
        evc.sync()
678
679 1
        result = "Schedule removed"
680 1
        status = 200
681
682 1
        log.debug("delete_schedule result %s %s", result, status)
683 1
        return jsonify(result), status
684
685 1
    def _is_duplicated_evc(self, evc):
686
        """Verify if the circuit given is duplicated with the stored evcs.
687
688
        Args:
689
            evc (EVC): circuit to be analysed.
690
691
        Returns:
692
            boolean: True if the circuit is duplicated, otherwise False.
693
694
        """
695 1
        for circuit in tuple(self.circuits.values()):
696 1
            if not circuit.archived and circuit.shares_uni(evc):
697 1
                return True
698 1
        return False
699
700 1
    @listen_to("kytos/topology.link_up")
701 1
    def on_link_up(self, event):
702
        """Change circuit when link is up or end_maintenance."""
703
        self.handle_link_up(event)
704
705 1
    def handle_link_up(self, event):
706
        """Change circuit when link is up or end_maintenance."""
707 1
        log.info("Event handle_link_up %s", event.content["link"])
708 1
        for evc in self.get_evcs_by_svc_level():
709 1
            if evc.is_enabled() and not evc.archived:
710 1
                with evc.lock:
711 1
                    evc.handle_link_up(event.content["link"])
712
713 1
    @listen_to("kytos/topology.link_down")
714 1
    def on_link_down(self, event):
715
        """Change circuit when link is down or under_mantenance."""
716
        self.handle_link_down(event)
717
718 1
    def handle_link_down(self, event):
719
        """Change circuit when link is down or under_mantenance."""
720 1
        link = event.content["link"]
721 1
        log.info("Event handle_link_down %s", link)
722 1
        switch_flows = {}
723 1
        evcs_with_failover = []
724 1
        evcs_normal = []
725 1
        check_failover = []
726 1
        for evc in self.get_evcs_by_svc_level():
727 1
            if evc.is_affected_by_link(link):
728
                # if there is no failover path, handles link down the
729
                # tradditional way
730 1
                if (
731
                    not getattr(evc, 'failover_path', None) or
732
                    evc.is_failover_path_affected_by_link(link)
733
                ):
734 1
                    evcs_normal.append(evc)
735 1
                    continue
736 1
                for dpid, flows in evc.get_failover_flows().items():
737 1
                    switch_flows.setdefault(dpid, [])
738 1
                    switch_flows[dpid].extend(flows)
739 1
                evcs_with_failover.append(evc)
740
            else:
741 1
                check_failover.append(evc)
742
743 1
        offset = 0
744 1
        while switch_flows:
745 1
            offset = (offset + settings.BATCH_SIZE) or None
746 1
            switches = list(switch_flows.keys())
747 1
            for dpid in switches:
748 1
                emit_event(
749
                    self.controller,
750
                    context="kytos.flow_manager",
751
                    name="flows.install",
752
                    content={
753
                        "dpid": dpid,
754
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
755
                    }
756
                )
757 1
                if offset is None or offset >= len(switch_flows[dpid]):
758 1
                    del switch_flows[dpid]
759 1
                    continue
760 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
761 1
            time.sleep(settings.BATCH_INTERVAL)
762
763 1
        for evc in evcs_with_failover:
764 1
            with evc.lock:
765 1
                old_path = evc.current_path
766 1
                evc.current_path = evc.failover_path
767 1
                evc.failover_path = old_path
768 1
                evc.sync()
769 1
            emit_event(self.controller, "redeployed_link_down",
770
                       content=map_evc_event_content(evc))
771 1
            log.info(
772
                f"{evc} redeployed with failover due to link down {link.id}"
773
            )
774
775 1
        for evc in evcs_normal:
776 1
            emit_event(
777
                self.controller,
778
                "evc_affected_by_link_down",
779
                content={"link_id": link.id} | map_evc_event_content(evc)
780
            )
781
782
        # After handling the hot path, check if new failover paths are needed.
783
        # Note that EVCs affected by link down will generate a KytosEvent for
784
        # deployed|redeployed, which will trigger the failover path setup.
785
        # Thus, we just need to further check the check_failover list
786 1
        for evc in check_failover:
787 1
            if evc.is_failover_path_affected_by_link(link):
788 1
                evc.setup_failover_path()
789
790 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
791 1
    def on_evc_affected_by_link_down(self, event):
792
        """Change circuit when link is down or under_mantenance."""
793
        self.handle_evc_affected_by_link_down(event)
794
795 1
    def handle_evc_affected_by_link_down(self, event):
796
        """Change circuit when link is down or under_mantenance."""
797 1
        evc = self.circuits.get(event.content["evc_id"])
798 1
        link_id = event.content['link_id']
799 1
        if not evc:
800 1
            return
801 1
        with evc.lock:
802 1
            result = evc.handle_link_down()
803 1
        event_name = "error_redeploy_link_down"
804 1
        if result:
805 1
            log.info(f"{evc} redeployed due to link down {link_id}")
806 1
            event_name = "redeployed_link_down"
807 1
        emit_event(self.controller, event_name,
808
                   content=map_evc_event_content(evc))
809
810 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
811 1
    def on_evc_deployed(self, event):
812
        """Handle EVC deployed|redeployed_link_down."""
813
        self.handle_evc_deployed(event)
814
815 1
    def handle_evc_deployed(self, event):
816
        """Setup failover path on evc deployed."""
817 1
        evc = self.circuits.get(event.content["evc_id"])
818 1
        if not evc:
819 1
            return
820 1
        with evc.lock:
821 1
            evc.setup_failover_path()
822
823 1
    @listen_to("kytos/topology.topology_loaded")
824 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
825
        """Load EVCs once the topology is available."""
826
        self.load_all_evcs()
827
828 1
    def load_all_evcs(self):
829
        """Try to load all EVCs on startup."""
830 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
831 1
        for circuit_id, circuit in circuits:
832 1
            if circuit_id not in self.circuits:
833 1
                self._load_evc(circuit)
834
835 1
    def _load_evc(self, circuit_dict):
836
        """Load one EVC from mongodb to memory."""
837 1
        try:
838 1
            evc = self._evc_from_dict(circuit_dict)
839 1
        except ValueError as exception:
840 1
            log.error(
841
                f"Could not load EVC: dict={circuit_dict} error={exception}"
842
            )
843 1
            return None
844
845 1
        if evc.archived:
846 1
            return None
847 1
        evc.deactivate()
848 1
        evc.sync()
849 1
        self.circuits.setdefault(evc.id, evc)
850 1
        self.sched.add(evc)
851 1
        return evc
852
853 1
    @listen_to("kytos/flow_manager.flow.error")
854 1
    def on_flow_mod_error(self, event):
855
        """Handle flow mod errors related to an EVC."""
856
        self.handle_flow_mod_error(event)
857
858 1
    def handle_flow_mod_error(self, event):
859
        """Handle flow mod errors related to an EVC."""
860 1
        flow = event.content["flow"]
861 1
        command = event.content.get("error_command")
862 1
        if command != "add":
863
            return
864 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
865 1
        if evc:
866 1
            evc.remove_current_flows()
867
868 1
    def _evc_dict_with_instances(self, evc_dict):
869
        """Convert some dict values to instance of EVC classes.
870
871
        This method will convert: [UNI, Link]
872
        """
873 1
        data = evc_dict.copy()  # Do not modify the original dict
874 1
        for attribute, value in data.items():
875
            # Get multiple attributes.
876
            # Ex: uni_a, uni_z
877 1
            if "uni" in attribute:
878 1
                try:
879 1
                    data[attribute] = self._uni_from_dict(value)
880 1
                except ValueError as exception:
881 1
                    result = "Error creating UNI: Invalid value"
882 1
                    raise ValueError(result) from exception
883
884 1
            if attribute == "circuit_scheduler":
885 1
                data[attribute] = []
886 1
                for schedule in value:
887 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
888
889
            # Get multiple attributes.
890
            # Ex: primary_links,
891
            #     backup_links,
892
            #     current_links_cache,
893
            #     primary_links_cache,
894
            #     backup_links_cache
895 1
            if "links" in attribute:
896 1
                data[attribute] = [
897
                    self._link_from_dict(link) for link in value
898
                ]
899
900
            # Ex: current_path,
901
            #     primary_path,
902
            #     backup_path
903 1
            if "path" in attribute and attribute != "dynamic_backup_path":
904 1
                data[attribute] = Path(
905
                    [self._link_from_dict(link) for link in value]
906
                )
907
908 1
        return data
909
910 1
    def _evc_from_dict(self, evc_dict):
911 1
        data = self._evc_dict_with_instances(evc_dict)
912 1
        return EVC(self.controller, **data)
913
914 1
    def _uni_from_dict(self, uni_dict):
915
        """Return a UNI object from python dict."""
916 1
        if uni_dict is None:
917 1
            return False
918
919 1
        interface_id = uni_dict.get("interface_id")
920 1
        interface = self.controller.get_interface_by_id(interface_id)
921 1
        if interface is None:
922 1
            result = (
923
                "Error creating UNI:"
924
                + f"Could not instantiate interface {interface_id}"
925
            )
926 1
            raise ValueError(result) from ValueError
927
928 1
        tag_dict = uni_dict.get("tag", None)
929 1
        if tag_dict:
930 1
            tag = TAG.from_dict(tag_dict)
931
        else:
932 1
            tag = None
933 1
        uni = UNI(interface, tag)
934
935 1
        return uni
936
937 1
    def _link_from_dict(self, link_dict):
938
        """Return a Link object from python dict."""
939 1
        id_a = link_dict.get("endpoint_a").get("id")
940 1
        id_b = link_dict.get("endpoint_b").get("id")
941
942 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
943 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
944 1
        if not endpoint_a:
945 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
946 1
            raise ValueError(error_msg)
947 1
        if not endpoint_b:
948
            error_msg = f"Could not get interface endpoint_b id {id_b}"
949
            raise ValueError(error_msg)
950
951 1
        link = Link(endpoint_a, endpoint_b)
952 1
        if "metadata" in link_dict:
953 1
            link.extend_metadata(link_dict.get("metadata"))
954
955 1
        s_vlan = link.get_metadata("s_vlan")
956 1
        if s_vlan:
957 1
            tag = TAG.from_dict(s_vlan)
958 1
            if tag is False:
959
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
960
                raise ValueError(error_msg)
961 1
            link.update_metadata("s_vlan", tag)
962 1
        return link
963
964 1
    def _find_evc_by_schedule_id(self, schedule_id):
965
        """
966
        Find an EVC and CircuitSchedule based on schedule_id.
967
968
        :param schedule_id: Schedule ID
969
        :return: EVC and Schedule
970
        """
971 1
        circuits = self._get_circuits_buffer()
972 1
        found_schedule = None
973 1
        evc = None
974
975
        # pylint: disable=unused-variable
976 1
        for c_id, circuit in circuits.items():
977 1
            for schedule in circuit.circuit_scheduler:
978 1
                if schedule.id == schedule_id:
979 1
                    found_schedule = schedule
980 1
                    evc = circuit
981 1
                    break
982 1
            if found_schedule:
983 1
                break
984 1
        return evc, found_schedule
985
986 1
    def _get_circuits_buffer(self):
987
        """
988
        Return the circuit buffer.
989
990
        If the buffer is empty, try to load data from mongodb.
991
        """
992 1
        if not self.circuits:
993
            # Load circuits from mongodb to buffer
994 1
            circuits = self.mongo_controller.get_circuits()['circuits']
995 1
            for c_id, circuit in circuits.items():
996 1
                evc = self._evc_from_dict(circuit)
997 1
                self.circuits[c_id] = evc
998 1
        return self.circuits
999
1000 1
    @staticmethod
1001 1
    def _json_from_request(caller):
1002
        """Return a json from request.
1003
1004
        If it was not possible to get a json from the request, log, for debug,
1005
        who was the caller and the error that ocurred, and raise an
1006
        Exception.
1007
        """
1008 1
        try:
1009 1
            json_data = request.get_json()
1010 1
        except ValueError as exception:
1011
            log.error(exception)
1012
            log.debug(f"{caller} result {exception} 400")
1013
            raise BadRequest(str(exception)) from BadRequest
1014 1
        except BadRequest:
1015 1
            result = "The request is not a valid JSON."
1016 1
            log.debug(f"{caller} result {result} 400")
1017 1
            raise BadRequest(result) from BadRequest
1018 1
        if json_data is None:
1019 1
            result = "Content-Type must be application/json"
1020 1
            log.debug(f"{caller} result {result} 415")
1021 1
            raise UnsupportedMediaType(result)
1022
        return json_data
1023