Passed
Pull Request — master (#298)
by
unknown
07:15
created

build.main.Main.add_bulk_metadata()   C

Complexity

Conditions 9

Size

Total Lines 40
Code Lines 33

Duplication

Lines 40
Ratio 100 %

Code Coverage

Tests 23
CRAP Score 10.801

Importance

Changes 0
Metric Value
cc 9
eloc 33
nop 1
dl 40
loc 40
ccs 23
cts 32
cp 0.7188
crap 10.801
rs 6.6666
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    @staticmethod
87 1
    def should_be_checked(circuit):
88
        "Verify if the circuit meets the necessary conditions to be checked"
89
        # pylint: disable=too-many-boolean-expressions
90 1
        if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
                # if a inter-switch EVC does not have current_path, it does not
97
                # make sense to run sdntrace on it
98
                and (circuit.is_intra_switch() or circuit.current_path)
99
                ):
100 1
            return True
101
        return False
102
103 1
    def execute_consistency(self):
104
        """Execute consistency routine."""
105 1
        circuits_to_check = []
106 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
107 1
        for circuit in self.get_evcs_by_svc_level():
108 1
            stored_circuits.pop(circuit.id, None)
109 1
            if self.should_be_checked(circuit):
110 1
                circuits_to_check.append(circuit)
111 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
112 1
        for circuit in circuits_to_check:
113 1
            is_checked = circuits_checked.get(circuit.id)
114 1
            if is_checked:
115 1
                circuit.execution_rounds = 0
116 1
                log.info(f"{circuit} enabled but inactive - activating")
117 1
                with circuit.lock:
118 1
                    circuit.activate()
119 1
                    circuit.sync()
120
            else:
121 1
                circuit.execution_rounds += 1
122 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
123 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
124 1
                    with circuit.lock:
125 1
                        circuit.deploy()
126 1
        for circuit_id in stored_circuits:
127 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
128 1
            self._load_evc(stored_circuits[circuit_id])
129
130 1
    def shutdown(self):
131
        """Execute when your napp is unloaded.
132
133
        If you have some cleanup procedure, insert it here.
134
        """
135
136 1
    @rest("/v2/evc/", methods=["GET"])
137 1
    def list_circuits(self):
138
        """Endpoint to return circuits stored.
139
140
        archive query arg if defined (not null) will be filtered
141
        accordingly, by default only non archived evcs will be listed
142
        """
143 1
        log.debug("list_circuits /v2/evc")
144 1
        archived = request.args.get("archived", "false").lower()
145 1
        archived_to_optional = {"null": None, "true": True, "false": False}
146 1
        archived = archived_to_optional.get(archived, False)
147 1
        arg_k = request.args.get("metadata_key")
148 1
        arg_v = request.args.get("metadata_value")
149 1
        if arg_k:
150
            circuits = self.mongo_controller.get_circuits(archived=archived,
151
                                                          match={arg_k: arg_v})
152
        else:
153 1
            circuits = self.mongo_controller.get_circuits(archived=archived)
154 1
        circuits = circuits['circuits']
155 1
        return jsonify(circuits), 200
156
157 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
158 1
    def get_circuit(self, circuit_id):
159
        """Endpoint to return a circuit based on id."""
160 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
161 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
162 1
        if not circuit:
163 1
            result = f"circuit_id {circuit_id} not found"
164 1
            log.debug("get_circuit result %s %s", result, 404)
165 1
            raise NotFound(result)
166 1
        status = 200
167 1
        log.debug("get_circuit result %s %s", circuit, status)
168 1
        return jsonify(circuit), status
169
170 1
    @rest("/v2/evc/", methods=["POST"])
171 1
    @validate(spec)
172 1
    def create_circuit(self, data):
173
        """Try to create a new circuit.
174
175
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
176
        UNI_Z's requested C-VID are available from the interfaces' pools. This
177
        is checked when creating the UNI object.
178
179
        Then, E-Line NApp requests a primary and a backup path to the
180
        Pathfinder NApp using the attributes primary_links and backup_links
181
        submitted via REST
182
183
        # For each link composing paths in #3:
184
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
185
        #  - Using the S-VID obtained, generate abstract flow entries to be
186
        #    sent to FlowManager
187
188
        Push abstract flow entries to FlowManager and FlowManager pushes
189
        OpenFlow entries to datapaths
190
191
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
192
        creation
193
194
        Finnaly, notify user of the status of its request.
195
        """
196
        # Try to create the circuit object
197 1
        log.debug("create_circuit /v2/evc/")
198
199 1
        try:
200 1
            evc = self._evc_from_dict(data)
201 1
        except ValueError as exception:
202 1
            log.debug("create_circuit result %s %s", exception, 400)
203 1
            raise BadRequest(str(exception)) from BadRequest
204
205 1
        if evc.primary_path:
206
            try:
207
                evc.primary_path.is_valid(
208
                    evc.uni_a.interface.switch,
209
                    evc.uni_z.interface.switch,
210
                    bool(evc.circuit_scheduler),
211
                )
212
            except InvalidPath as exception:
213
                raise BadRequest(
214
                    f"primary_path is not valid: {exception}"
215
                ) from exception
216 1
        if evc.backup_path:
217
            try:
218
                evc.backup_path.is_valid(
219
                    evc.uni_a.interface.switch,
220
                    evc.uni_z.interface.switch,
221
                    bool(evc.circuit_scheduler),
222
                )
223
            except InvalidPath as exception:
224
                raise BadRequest(
225
                    f"backup_path is not valid: {exception}"
226
                ) from exception
227
228
        # verify duplicated evc
229 1
        if self._is_duplicated_evc(evc):
230 1
            result = "The EVC already exists."
231 1
            log.debug("create_circuit result %s %s", result, 409)
232 1
            raise Conflict(result)
233
234 1
        try:
235 1
            evc._validate_has_primary_or_dynamic()
236 1
        except ValueError as exception:
237 1
            raise BadRequest(str(exception)) from exception
238
239
        # save circuit
240 1
        try:
241 1
            evc.sync()
242
        except ValidationError as exception:
243
            raise BadRequest(str(exception)) from exception
244
245
        # store circuit in dictionary
246 1
        self.circuits[evc.id] = evc
247
248
        # Schedule the circuit deploy
249 1
        self.sched.add(evc)
250
251
        # Circuit has no schedule, deploy now
252 1
        if not evc.circuit_scheduler:
253 1
            with evc.lock:
254 1
                evc.deploy()
255
256
        # Notify users
257 1
        result = {"circuit_id": evc.id}
258 1
        status = 201
259 1
        log.debug("create_circuit result %s %s", result, status)
260 1
        emit_event(self.controller, name="created",
261
                   content=map_evc_event_content(evc))
262 1
        return jsonify(result), status
263
264 1
    @listen_to('kytos/flow_manager.flow.removed')
265 1
    def on_flow_delete(self, event):
266
        """Capture delete messages to keep track when flows got removed."""
267
        self.handle_flow_delete(event)
268
269 1
    def handle_flow_delete(self, event):
270
        """Keep track when the EVC got flows removed by deriving its cookie."""
271 1
        flow = event.content["flow"]
272 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
273 1
        if evc:
274 1
            log.debug("Flow removed in EVC %s", evc.id)
275 1
            evc.set_flow_removed_at()
276
277 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
278 1
    def update(self, circuit_id):
279
        """Update a circuit based on payload.
280
281
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
282
        """
283 1
        log.debug("update /v2/evc/%s", circuit_id)
284 1
        try:
285 1
            evc = self.circuits[circuit_id]
286 1
        except KeyError:
287 1
            result = f"circuit_id {circuit_id} not found"
288 1
            log.debug("update result %s %s", result, 404)
289 1
            raise NotFound(result) from NotFound
290
291 1
        if evc.archived:
292 1
            result = "Can't update archived EVC"
293 1
            log.debug("update result %s %s", result, 405)
294 1
            raise MethodNotAllowed(["GET"], result)
295
296 1
        try:
297 1
            data = request.get_json()
298 1
        except BadRequest:
299 1
            result = "The request body is not a well-formed JSON."
300 1
            log.debug("update result %s %s", result, 400)
301 1
            raise BadRequest(result) from BadRequest
302 1
        if data is None:
303 1
            result = "The request body mimetype is not application/json."
304 1
            log.debug("update result %s %s", result, 415)
305 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
306
307 1
        try:
308 1
            enable, redeploy = evc.update(
309
                **self._evc_dict_with_instances(data)
310
            )
311 1
        except ValidationError as exception:
312
            raise BadRequest(str(exception)) from exception
313 1
        except ValueError as exception:
314 1
            log.error(exception)
315 1
            log.debug("update result %s %s", exception, 400)
316 1
            raise BadRequest(str(exception)) from BadRequest
317
318 1
        if evc.is_active():
319
            if enable is False:  # disable if active
320
                with evc.lock:
321
                    evc.remove()
322
            elif redeploy is not None:  # redeploy if active
323
                with evc.lock:
324
                    evc.remove()
325
                    evc.deploy()
326
        else:
327 1
            if enable is True:  # enable if inactive
328 1
                with evc.lock:
329 1
                    evc.deploy()
330 1
        result = {evc.id: evc.as_dict()}
331 1
        status = 200
332
333 1
        log.debug("update result %s %s", result, status)
334 1
        emit_event(self.controller, "updated",
335
                   content=map_evc_event_content(evc, **data))
336 1
        return jsonify(result), status
337
338 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
339 1
    def delete_circuit(self, circuit_id):
340
        """Remove a circuit.
341
342
        First, the flows are removed from the switches, and then the EVC is
343
        disabled.
344
        """
345 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
346 1
        try:
347 1
            evc = self.circuits[circuit_id]
348 1
        except KeyError:
349 1
            result = f"circuit_id {circuit_id} not found"
350 1
            log.debug("delete_circuit result %s %s", result, 404)
351 1
            raise NotFound(result) from NotFound
352
353 1
        if evc.archived:
354 1
            result = f"Circuit {circuit_id} already removed"
355 1
            log.debug("delete_circuit result %s %s", result, 404)
356 1
            raise NotFound(result) from NotFound
357
358 1
        log.info("Removing %s", evc)
359 1
        with evc.lock:
360 1
            evc.remove_current_flows()
361 1
            evc.remove_failover_flows(sync=False)
362 1
            evc.deactivate()
363 1
            evc.disable()
364 1
            self.sched.remove(evc)
365 1
            evc.archive()
366 1
            evc.sync()
367 1
        log.info("EVC removed. %s", evc)
368 1
        result = {"response": f"Circuit {circuit_id} removed"}
369 1
        status = 200
370
371 1
        log.debug("delete_circuit result %s %s", result, status)
372 1
        emit_event(self.controller, "deleted",
373
                   content=map_evc_event_content(evc))
374 1
        return jsonify(result), status
375
376 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
377 1
    def get_metadata(self, circuit_id):
378
        """Get metadata from an EVC."""
379 1
        try:
380 1
            return (
381
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
382
                200,
383
            )
384
        except KeyError as error:
385
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
386
387 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
388 1
    def add_bulk_metadata(self):
389
        """Add metadata to a bulk of EVCs."""
390 1
        try:
391 1
            metadata = request.get_json()
392 1
            content_type = request.content_type
393
        except BadRequest as error:
394
            result = "The request body is not a well-formed JSON."
395
            raise BadRequest(result) from error
396 1
        if content_type is None:
397
            result = "The request body is empty."
398
            raise BadRequest(result)
399 1
        if metadata is None:
400
            if content_type != "application/json":
401
                result = (
402
                    "The content type must be application/json "
403
                    f"(received {content_type})."
404
                )
405
            else:
406
                result = "Metadata is empty."
407
            raise UnsupportedMediaType(result)
408
409 1
        circuit_ids = metadata.pop("circuit_ids", None)
410 1
        if circuit_ids is None:
411 1
            raise BadRequest("The key 'circuit_ids' is missing.")
412 1
        payload = {f"metadata.{k}": v for k, v in metadata.items()}
413 1
        self.mongo_controller.update_bulk_evc(circuit_ids, {"$set": payload})
414
415 1
        fail_evcs = set()
416 1
        for _id in circuit_ids:
417 1
            try:
418 1
                evc = self.circuits[_id]
419 1
                evc.extend_metadata(metadata)
420 1
                evc.sync()
421 1
            except KeyError:
422 1
                fail_evcs.add(_id)
423
424 1
        if fail_evcs:
425 1
            raise NotFound(f"Circuits {str(fail_evcs)} were not found")
426 1
        return jsonify("Operation successful"), 201
427
428 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
429 1
    def add_metadata(self, circuit_id):
430
        """Add metadata to an EVC."""
431 1
        try:
432 1
            metadata = request.get_json()
433 1
            content_type = request.content_type
434 1
        except BadRequest as error:
435 1
            result = "The request body is not a well-formed JSON."
436 1
            raise BadRequest(result) from error
437 1
        if content_type is None:
438 1
            result = "The request body is empty."
439 1
            raise BadRequest(result)
440 1
        if metadata is None:
441 1
            if content_type != "application/json":
442 1
                result = (
443
                    "The content type must be application/json "
444
                    f"(received {content_type})."
445
                )
446
            else:
447
                result = "Metadata is empty."
448 1
            raise UnsupportedMediaType(result)
449
450 1
        try:
451 1
            evc = self.circuits[circuit_id]
452 1
        except KeyError as error:
453 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
454
455 1
        evc.extend_metadata(metadata)
456 1
        evc.sync()
457 1
        return jsonify("Operation successful"), 201
458
459 1 View Code Duplication
    @rest("v2/evc/metadata/<key>", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460 1
    def delete_bulk_metadata(self, key):
461
        """Delete metada from a bulk of EVCs"""
462 1
        try:
463 1
            metadata = request.get_json()
464 1
            content_type = request.content_type
465
        except BadRequest as error:
466
            result = "The request body is not a well-formed JSON."
467
            raise BadRequest(result) from error
468 1
        if content_type is None:
469
            result = "The request body is empty."
470
            raise BadRequest(result)
471 1
        if metadata is None:
472
            if content_type != "application/json":
473
                result = (
474
                    "The content type must be application/json "
475
                    f"(received {content_type})."
476
                )
477
            else:
478
                result = "Request is empty."
479
            raise UnsupportedMediaType(result)
480
481 1
        circuit_ids = metadata.pop("circuit_ids", None)
482 1
        if circuit_ids is None:
483
            raise BadRequest("The key 'circuit_ids' is missing.")
484 1
        payload = {f"metadata.{key}": ""}
485 1
        self.mongo_controller.update_bulk_evc(circuit_ids, {"$unset": payload})
486
487 1
        fail_evcs = set()
488 1
        for _id in circuit_ids:
489 1
            try:
490 1
                evc = self.circuits[_id]
491 1
                evc.remove_metadata(key)
492 1
                evc.sync()
493
            except KeyError:
494
                fail_evcs.add(_id)
495
496 1
        if fail_evcs:
497
            raise NotFound(f"Circuits {str(fail_evcs)} were not found")
498 1
        return jsonify("Operation successful"), 200
499
500 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
501 1
    def delete_metadata(self, circuit_id, key):
502
        """Delete metadata from an EVC."""
503 1
        try:
504 1
            evc = self.circuits[circuit_id]
505 1
        except KeyError as error:
506 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
507
508 1
        evc.remove_metadata(key)
509 1
        evc.sync()
510 1
        return jsonify("Operation successful"), 200
511
512 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
513 1
    def redeploy(self, circuit_id):
514
        """Endpoint to force the redeployment of an EVC."""
515 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
516 1
        try:
517 1
            evc = self.circuits[circuit_id]
518 1
        except KeyError:
519 1
            result = f"circuit_id {circuit_id} not found"
520 1
            raise NotFound(result) from NotFound
521 1
        if evc.is_enabled():
522 1
            with evc.lock:
523 1
                evc.remove_current_flows()
524 1
                evc.deploy()
525 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
526 1
            status = 202
527
        else:
528 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
529 1
            status = 409
530
531 1
        return jsonify(result), status
532
533 1
    @rest("/v2/evc/schedule", methods=["GET"])
534 1
    def list_schedules(self):
535
        """Endpoint to return all schedules stored for all circuits.
536
537
        Return a JSON with the following template:
538
        [{"schedule_id": <schedule_id>,
539
         "circuit_id": <circuit_id>,
540
         "schedule": <schedule object>}]
541
        """
542 1
        log.debug("list_schedules /v2/evc/schedule")
543 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
544 1
        if not circuits:
545 1
            result = {}
546 1
            status = 200
547 1
            return jsonify(result), status
548
549 1
        result = []
550 1
        status = 200
551 1
        for circuit in circuits:
552 1
            circuit_scheduler = circuit.get("circuit_scheduler")
553 1
            if circuit_scheduler:
554 1
                for scheduler in circuit_scheduler:
555 1
                    value = {
556
                        "schedule_id": scheduler.get("id"),
557
                        "circuit_id": circuit.get("id"),
558
                        "schedule": scheduler,
559
                    }
560 1
                    result.append(value)
561
562 1
        log.debug("list_schedules result %s %s", result, status)
563 1
        return jsonify(result), status
564
565 1
    @rest("/v2/evc/schedule/", methods=["POST"])
566 1
    def create_schedule(self):
567
        """
568
        Create a new schedule for a given circuit.
569
570
        This service do no check if there are conflicts with another schedule.
571
        Payload example:
572
            {
573
              "circuit_id":"aa:bb:cc",
574
              "schedule": {
575
                "date": "2019-08-07T14:52:10.967Z",
576
                "interval": "string",
577
                "frequency": "1 * * * *",
578
                "action": "create"
579
              }
580
            }
581
        """
582 1
        log.debug("create_schedule /v2/evc/schedule/")
583
584 1
        json_data = self._json_from_request("create_schedule")
585 1
        try:
586 1
            circuit_id = json_data["circuit_id"]
587 1
        except TypeError:
588 1
            result = "The payload should have a dictionary."
589 1
            log.debug("create_schedule result %s %s", result, 400)
590 1
            raise BadRequest(result) from BadRequest
591 1
        except KeyError:
592 1
            result = "Missing circuit_id."
593 1
            log.debug("create_schedule result %s %s", result, 400)
594 1
            raise BadRequest(result) from BadRequest
595
596 1
        try:
597 1
            schedule_data = json_data["schedule"]
598 1
        except KeyError:
599 1
            result = "Missing schedule data."
600 1
            log.debug("create_schedule result %s %s", result, 400)
601 1
            raise BadRequest(result) from BadRequest
602
603
        # Get EVC from circuits buffer
604 1
        circuits = self._get_circuits_buffer()
605
606
        # get the circuit
607 1
        evc = circuits.get(circuit_id)
608
609
        # get the circuit
610 1
        if not evc:
611 1
            result = f"circuit_id {circuit_id} not found"
612 1
            log.debug("create_schedule result %s %s", result, 404)
613 1
            raise NotFound(result) from NotFound
614
        # Can not modify circuits deleted and archived
615 1
        if evc.archived:
616 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
617 1
            log.debug("create_schedule result %s %s", result, 403)
618 1
            raise Forbidden(result) from Forbidden
619
620
        # new schedule from dict
621 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
622
623
        # If there is no schedule, create the list
624 1
        if not evc.circuit_scheduler:
625 1
            evc.circuit_scheduler = []
626
627
        # Add the new schedule
628 1
        evc.circuit_scheduler.append(new_schedule)
629
630
        # Add schedule job
631 1
        self.sched.add_circuit_job(evc, new_schedule)
632
633
        # save circuit to mongodb
634 1
        evc.sync()
635
636 1
        result = new_schedule.as_dict()
637 1
        status = 201
638
639 1
        log.debug("create_schedule result %s %s", result, status)
640 1
        return jsonify(result), status
641
642 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
643 1
    def update_schedule(self, schedule_id):
644
        """Update a schedule.
645
646
        Change all attributes from the given schedule from a EVC circuit.
647
        The schedule ID is preserved as default.
648
        Payload example:
649
            {
650
              "date": "2019-08-07T14:52:10.967Z",
651
              "interval": "string",
652
              "frequency": "1 * * *",
653
              "action": "create"
654
            }
655
        """
656 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
657
658
        # Try to find a circuit schedule
659 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
660
661
        # Can not modify circuits deleted and archived
662 1
        if not found_schedule:
663 1
            result = f"schedule_id {schedule_id} not found"
664 1
            log.debug("update_schedule result %s %s", result, 404)
665 1
            raise NotFound(result) from NotFound
666 1
        if evc.archived:
667 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
668 1
            log.debug("update_schedule result %s %s", result, 403)
669 1
            raise Forbidden(result) from Forbidden
670
671 1
        data = self._json_from_request("update_schedule")
672
673 1
        new_schedule = CircuitSchedule.from_dict(data)
674 1
        new_schedule.id = found_schedule.id
675
        # Remove the old schedule
676 1
        evc.circuit_scheduler.remove(found_schedule)
677
        # Append the modified schedule
678 1
        evc.circuit_scheduler.append(new_schedule)
679
680
        # Cancel all schedule jobs
681 1
        self.sched.cancel_job(found_schedule.id)
682
        # Add the new circuit schedule
683 1
        self.sched.add_circuit_job(evc, new_schedule)
684
        # Save EVC to mongodb
685 1
        evc.sync()
686
687 1
        result = new_schedule.as_dict()
688 1
        status = 200
689
690 1
        log.debug("update_schedule result %s %s", result, status)
691 1
        return jsonify(result), status
692
693 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
694 1
    def delete_schedule(self, schedule_id):
695
        """Remove a circuit schedule.
696
697
        Remove the Schedule from EVC.
698
        Remove the Schedule from cron job.
699
        Save the EVC to the Storehouse.
700
        """
701 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
702 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
703
704
        # Can not modify circuits deleted and archived
705 1
        if not found_schedule:
706 1
            result = f"schedule_id {schedule_id} not found"
707 1
            log.debug("delete_schedule result %s %s", result, 404)
708 1
            raise NotFound(result)
709
710 1
        if evc.archived:
711 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
712 1
            log.debug("delete_schedule result %s %s", result, 403)
713 1
            raise Forbidden(result)
714
715
        # Remove the old schedule
716 1
        evc.circuit_scheduler.remove(found_schedule)
717
718
        # Cancel all schedule jobs
719 1
        self.sched.cancel_job(found_schedule.id)
720
        # Save EVC to mongodb
721 1
        evc.sync()
722
723 1
        result = "Schedule removed"
724 1
        status = 200
725
726 1
        log.debug("delete_schedule result %s %s", result, status)
727 1
        return jsonify(result), status
728
729 1
    def _is_duplicated_evc(self, evc):
730
        """Verify if the circuit given is duplicated with the stored evcs.
731
732
        Args:
733
            evc (EVC): circuit to be analysed.
734
735
        Returns:
736
            boolean: True if the circuit is duplicated, otherwise False.
737
738
        """
739 1
        for circuit in tuple(self.circuits.values()):
740 1
            if not circuit.archived and circuit.shares_uni(evc):
741 1
                return True
742 1
        return False
743
744 1
    @listen_to("kytos/topology.link_up")
745 1
    def on_link_up(self, event):
746
        """Change circuit when link is up or end_maintenance."""
747
        self.handle_link_up(event)
748
749 1
    def handle_link_up(self, event):
750
        """Change circuit when link is up or end_maintenance."""
751 1
        log.info("Event handle_link_up %s", event.content["link"])
752 1
        for evc in self.get_evcs_by_svc_level():
753 1
            if evc.is_enabled() and not evc.archived:
754 1
                with evc.lock:
755 1
                    evc.handle_link_up(event.content["link"])
756
757 1
    @listen_to("kytos/topology.link_down")
758 1
    def on_link_down(self, event):
759
        """Change circuit when link is down or under_mantenance."""
760
        self.handle_link_down(event)
761
762 1
    def handle_link_down(self, event):
763
        """Change circuit when link is down or under_mantenance."""
764 1
        link = event.content["link"]
765 1
        log.info("Event handle_link_down %s", link)
766 1
        switch_flows = {}
767 1
        evcs_with_failover = []
768 1
        evcs_normal = []
769 1
        check_failover = []
770 1
        for evc in self.get_evcs_by_svc_level():
771 1
            if evc.is_affected_by_link(link):
772
                # if there is no failover path, handles link down the
773
                # tradditional way
774 1
                if (
775
                    not getattr(evc, 'failover_path', None) or
776
                    evc.is_failover_path_affected_by_link(link)
777
                ):
778 1
                    evcs_normal.append(evc)
779 1
                    continue
780 1
                for dpid, flows in evc.get_failover_flows().items():
781 1
                    switch_flows.setdefault(dpid, [])
782 1
                    switch_flows[dpid].extend(flows)
783 1
                evcs_with_failover.append(evc)
784
            else:
785 1
                check_failover.append(evc)
786
787 1
        offset = 0
788 1
        while switch_flows:
789 1
            offset = (offset + settings.BATCH_SIZE) or None
790 1
            switches = list(switch_flows.keys())
791 1
            for dpid in switches:
792 1
                emit_event(
793
                    self.controller,
794
                    context="kytos.flow_manager",
795
                    name="flows.install",
796
                    content={
797
                        "dpid": dpid,
798
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
799
                    }
800
                )
801 1
                if offset is None or offset >= len(switch_flows[dpid]):
802 1
                    del switch_flows[dpid]
803 1
                    continue
804 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
805 1
            time.sleep(settings.BATCH_INTERVAL)
806
807 1
        for evc in evcs_with_failover:
808 1
            with evc.lock:
809 1
                old_path = evc.current_path
810 1
                evc.current_path = evc.failover_path
811 1
                evc.failover_path = old_path
812 1
                evc.sync()
813 1
            emit_event(self.controller, "redeployed_link_down",
814
                       content=map_evc_event_content(evc))
815 1
            log.info(
816
                f"{evc} redeployed with failover due to link down {link.id}"
817
            )
818
819 1
        for evc in evcs_normal:
820 1
            emit_event(
821
                self.controller,
822
                "evc_affected_by_link_down",
823
                content={"link_id": link.id} | map_evc_event_content(evc)
824
            )
825
826
        # After handling the hot path, check if new failover paths are needed.
827
        # Note that EVCs affected by link down will generate a KytosEvent for
828
        # deployed|redeployed, which will trigger the failover path setup.
829
        # Thus, we just need to further check the check_failover list
830 1
        for evc in check_failover:
831 1
            if evc.is_failover_path_affected_by_link(link):
832 1
                evc.setup_failover_path()
833
834 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
835 1
    def on_evc_affected_by_link_down(self, event):
836
        """Change circuit when link is down or under_mantenance."""
837
        self.handle_evc_affected_by_link_down(event)
838
839 1
    def handle_evc_affected_by_link_down(self, event):
840
        """Change circuit when link is down or under_mantenance."""
841 1
        evc = self.circuits.get(event.content["evc_id"])
842 1
        link_id = event.content['link_id']
843 1
        if not evc:
844 1
            return
845 1
        with evc.lock:
846 1
            result = evc.handle_link_down()
847 1
        event_name = "error_redeploy_link_down"
848 1
        if result:
849 1
            log.info(f"{evc} redeployed due to link down {link_id}")
850 1
            event_name = "redeployed_link_down"
851 1
        emit_event(self.controller, event_name,
852
                   content=map_evc_event_content(evc))
853
854 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
855 1
    def on_evc_deployed(self, event):
856
        """Handle EVC deployed|redeployed_link_down."""
857
        self.handle_evc_deployed(event)
858
859 1
    def handle_evc_deployed(self, event):
860
        """Setup failover path on evc deployed."""
861 1
        evc = self.circuits.get(event.content["evc_id"])
862 1
        if not evc:
863 1
            return
864 1
        with evc.lock:
865 1
            evc.setup_failover_path()
866
867 1
    @listen_to("kytos/topology.topology_loaded")
868 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
869
        """Load EVCs once the topology is available."""
870
        self.load_all_evcs()
871
872 1
    def load_all_evcs(self):
873
        """Try to load all EVCs on startup."""
874 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
875 1
        for circuit_id, circuit in circuits:
876 1
            if circuit_id not in self.circuits:
877 1
                self._load_evc(circuit)
878
879 1
    def _load_evc(self, circuit_dict):
880
        """Load one EVC from mongodb to memory."""
881 1
        try:
882 1
            evc = self._evc_from_dict(circuit_dict)
883 1
        except ValueError as exception:
884 1
            log.error(
885
                f"Could not load EVC: dict={circuit_dict} error={exception}"
886
            )
887 1
            return None
888
889 1
        if evc.archived:
890 1
            return None
891 1
        evc.deactivate()
892 1
        evc.sync()
893 1
        self.circuits.setdefault(evc.id, evc)
894 1
        self.sched.add(evc)
895 1
        return evc
896
897 1
    @listen_to("kytos/flow_manager.flow.error")
898 1
    def on_flow_mod_error(self, event):
899
        """Handle flow mod errors related to an EVC."""
900
        self.handle_flow_mod_error(event)
901
902 1
    def handle_flow_mod_error(self, event):
903
        """Handle flow mod errors related to an EVC."""
904 1
        flow = event.content["flow"]
905 1
        command = event.content.get("error_command")
906 1
        if command != "add":
907
            return
908 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
909 1
        if evc:
910 1
            evc.remove_current_flows()
911
912 1
    def _evc_dict_with_instances(self, evc_dict):
913
        """Convert some dict values to instance of EVC classes.
914
915
        This method will convert: [UNI, Link]
916
        """
917 1
        data = evc_dict.copy()  # Do not modify the original dict
918 1
        for attribute, value in data.items():
919
            # Get multiple attributes.
920
            # Ex: uni_a, uni_z
921 1
            if "uni" in attribute:
922 1
                try:
923 1
                    data[attribute] = self._uni_from_dict(value)
924 1
                except ValueError as exception:
925 1
                    result = "Error creating UNI: Invalid value"
926 1
                    raise ValueError(result) from exception
927
928 1
            if attribute == "circuit_scheduler":
929 1
                data[attribute] = []
930 1
                for schedule in value:
931 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
932
933
            # Get multiple attributes.
934
            # Ex: primary_links,
935
            #     backup_links,
936
            #     current_links_cache,
937
            #     primary_links_cache,
938
            #     backup_links_cache
939 1
            if "links" in attribute:
940 1
                data[attribute] = [
941
                    self._link_from_dict(link) for link in value
942
                ]
943
944
            # Ex: current_path,
945
            #     primary_path,
946
            #     backup_path
947 1
            if "path" in attribute and attribute != "dynamic_backup_path":
948 1
                data[attribute] = Path(
949
                    [self._link_from_dict(link) for link in value]
950
                )
951
952 1
        return data
953
954 1
    def _evc_from_dict(self, evc_dict):
955 1
        data = self._evc_dict_with_instances(evc_dict)
956 1
        return EVC(self.controller, **data)
957
958 1
    def _uni_from_dict(self, uni_dict):
959
        """Return a UNI object from python dict."""
960 1
        if uni_dict is None:
961 1
            return False
962
963 1
        interface_id = uni_dict.get("interface_id")
964 1
        interface = self.controller.get_interface_by_id(interface_id)
965 1
        if interface is None:
966 1
            result = (
967
                "Error creating UNI:"
968
                + f"Could not instantiate interface {interface_id}"
969
            )
970 1
            raise ValueError(result) from ValueError
971
972 1
        tag_dict = uni_dict.get("tag", None)
973 1
        if tag_dict:
974 1
            tag = TAG.from_dict(tag_dict)
975
        else:
976 1
            tag = None
977 1
        uni = UNI(interface, tag)
978
979 1
        return uni
980
981 1
    def _link_from_dict(self, link_dict):
982
        """Return a Link object from python dict."""
983 1
        id_a = link_dict.get("endpoint_a").get("id")
984 1
        id_b = link_dict.get("endpoint_b").get("id")
985
986 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
987 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
988 1
        if not endpoint_a:
989 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
990 1
            raise ValueError(error_msg)
991 1
        if not endpoint_b:
992
            error_msg = f"Could not get interface endpoint_b id {id_b}"
993
            raise ValueError(error_msg)
994
995 1
        link = Link(endpoint_a, endpoint_b)
996 1
        if "metadata" in link_dict:
997 1
            link.extend_metadata(link_dict.get("metadata"))
998
999 1
        s_vlan = link.get_metadata("s_vlan")
1000 1
        if s_vlan:
1001 1
            tag = TAG.from_dict(s_vlan)
1002 1
            if tag is False:
1003
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1004
                raise ValueError(error_msg)
1005 1
            link.update_metadata("s_vlan", tag)
1006 1
        return link
1007
1008 1
    def _find_evc_by_schedule_id(self, schedule_id):
1009
        """
1010
        Find an EVC and CircuitSchedule based on schedule_id.
1011
1012
        :param schedule_id: Schedule ID
1013
        :return: EVC and Schedule
1014
        """
1015 1
        circuits = self._get_circuits_buffer()
1016 1
        found_schedule = None
1017 1
        evc = None
1018
1019
        # pylint: disable=unused-variable
1020 1
        for c_id, circuit in circuits.items():
1021 1
            for schedule in circuit.circuit_scheduler:
1022 1
                if schedule.id == schedule_id:
1023 1
                    found_schedule = schedule
1024 1
                    evc = circuit
1025 1
                    break
1026 1
            if found_schedule:
1027 1
                break
1028 1
        return evc, found_schedule
1029
1030 1
    def _get_circuits_buffer(self):
1031
        """
1032
        Return the circuit buffer.
1033
1034
        If the buffer is empty, try to load data from mongodb.
1035
        """
1036 1
        if not self.circuits:
1037
            # Load circuits from mongodb to buffer
1038 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1039 1
            for c_id, circuit in circuits.items():
1040 1
                evc = self._evc_from_dict(circuit)
1041 1
                self.circuits[c_id] = evc
1042 1
        return self.circuits
1043
1044 1
    @staticmethod
1045 1
    def _json_from_request(caller):
1046
        """Return a json from request.
1047
1048
        If it was not possible to get a json from the request, log, for debug,
1049
        who was the caller and the error that ocurred, and raise an
1050
        Exception.
1051
        """
1052 1
        try:
1053 1
            json_data = request.get_json()
1054 1
        except ValueError as exception:
1055
            log.error(exception)
1056
            log.debug(f"{caller} result {exception} 400")
1057
            raise BadRequest(str(exception)) from BadRequest
1058 1
        except BadRequest:
1059 1
            result = "The request is not a valid JSON."
1060 1
            log.debug(f"{caller} result {result} 400")
1061 1
            raise BadRequest(result) from BadRequest
1062 1
        if json_data is None:
1063 1
            result = "Content-Type must be application/json"
1064 1
            log.debug(f"{caller} result {result} 415")
1065 1
            raise UnsupportedMediaType(result)
1066
        return json_data
1067