Test Failed
Push — master ( a63869...63c1d9 )
by Antonio
04:17 queued 11s
created

build.utils   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 95
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 65
dl 0
loc 95
rs 10
c 0
b 0
f 0
wmc 8

4 Functions

Rating   Name   Duplication   Size   Complexity  
A emit_event() 0 5 1
A compare_endpoint_trace() 0 6 1
A load_spec() 0 9 1
B validate() 0 51 5
1
"""Utility functions."""
2
import functools
3
from pathlib import Path
4
5
from flask import request
6
from openapi_core import create_spec
7
from openapi_core.contrib.flask import FlaskOpenAPIRequest
8
from openapi_core.validation.request.validators import RequestValidator
9
from openapi_spec_validator import validate_spec
10
from openapi_spec_validator.readers import read_from_filename
11
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
12
13
from kytos.core import log
14
from kytos.core.events import KytosEvent
15
16
17
def emit_event(controller, name, **kwargs):
18
    """Send an event when something happens with an EVC."""
19
    event_name = f"kytos/mef_eline.{name}"
20
    event = KytosEvent(name=event_name, content=kwargs)
21
    controller.buffers.app.put(event)
22
23
24
def compare_endpoint_trace(endpoint, vlan, trace):
25
    """Compare and endpoint with a trace step."""
26
    return (
27
        endpoint.switch.dpid == trace["dpid"]
28
        and endpoint.port_number == trace["port"]
29
        and vlan == trace["vlan"]
30
    )
31
32
33
def load_spec():
34
    """Validate openapi spec."""
35
    napp_dir = Path(__file__).parent
36
    yml_file = napp_dir / "openapi.yml"
37
    spec_dict, _ = read_from_filename(yml_file)
38
39
    validate_spec(spec_dict)
40
41
    return create_spec(spec_dict)
42
43
44
def validate(spec):
45
    """Decorator to validate a REST endpoint input.
46
47
    Uses the schema defined in the openapi.yml file
48
    to validate.
49
    """
50
51
    def validate_decorator(func):
52
        @functools.wraps(func)
53
        def wrapper_validate(*args, **kwargs):
54
            try:
55
                data = request.get_json()
56
            except BadRequest:
57
                result = "The request body is not a well-formed JSON."
58
                log.debug("create_circuit result %s %s", result, 400)
59
                raise BadRequest(result) from BadRequest
60
            if data is None:
61
                result = "The request body mimetype is not application/json."
62
                log.debug("update result %s %s", result, 415)
63
                raise UnsupportedMediaType(result)
64
65
            validator = RequestValidator(spec)
66
            openapi_request = FlaskOpenAPIRequest(request)
67
            result = validator.validate(openapi_request)
68
            if result.errors:
69
                errors = result.errors[0]
70
                if hasattr(errors, "schema_errors"):
71
                    schema_errors = errors.schema_errors[0]
72
                    error_log = {
73
                        "error_message": schema_errors.message,
74
                        "error_validator": schema_errors.validator,
75
                        "error_validator_value": schema_errors.validator_value,
76
                        "error_path": list(schema_errors.path),
77
                        "error_schema": schema_errors.schema,
78
                        "error_schema_path": list(schema_errors.schema_path),
79
                    }
80
                    log.debug("error response: %s", error_log)
81
                    error_response = f"{schema_errors.message} for field"
82
                    error_response += (
83
                        f" {'/'.join(map(str,schema_errors.path))}."
84
                    )
85
                else:
86
                    error_response = (
87
                        "The request body mimetype is not application/json."
88
                    )
89
                raise BadRequest(error_response) from BadRequest
90
            return func(*args, data=data, **kwargs)
91
92
        return wrapper_validate
93
94
    return validate_decorator
95