Passed
Pull Request — master (#246)
by Italo Valcy
03:24
created

build.utils.compare_endpoint_trace()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
ccs 4
cts 4
cp 1
rs 9.95
c 0
b 0
f 0
cc 3
nop 3
crap 3
1
"""Utility functions."""
2 1
import functools
3 1
from pathlib import Path
4
5 1
from flask import request
6 1
from openapi_core import create_spec
7 1
from openapi_core.contrib.flask import FlaskOpenAPIRequest
8 1
from openapi_core.validation.request.validators import RequestValidator
9 1
from openapi_spec_validator import validate_spec
10 1
from openapi_spec_validator.readers import read_from_filename
11 1
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
12
13 1
from kytos.core import log
14 1
from kytos.core.events import KytosEvent
15
16
17 1
def emit_event(controller, name, context="kytos/mef_eline", **kwargs):
18
    """Send an event when something happens with an EVC."""
19 1
    event_name = f"{context}.{name}"
20 1
    event = KytosEvent(name=event_name, content=kwargs)
21 1
    controller.buffers.app.put(event)
22
23
24 1
def notify_link_available_tags(controller, link, src_func=None):
25
    """Notify link available tags."""
26 1
    emit_event(controller, "link_available_tags", link=link, src_func=src_func)
27
28
29 1
def compare_endpoint_trace(endpoint, vlan, trace):
30
    """Compare and endpoint with a trace step."""
31 1
    if vlan and "vlan" in trace:
32 1
        return (
33
            endpoint.switch.dpid == trace["dpid"]
34
            and endpoint.port_number == trace["port"]
35
            and vlan == trace["vlan"]
36
        )
37 1
    return (
38
        endpoint.switch.dpid == trace["dpid"]
39
        and endpoint.port_number == trace["port"]
40
    )
41
42
43 1
def compare_uni_out_trace(uni, trace):
44
    """Check if the trace last step (output) matches the UNI attributes."""
45
    # keep compatibility for old versions of sdntrace-cp
46 1
    if "out" not in trace:
47 1
        return True
48 1
    if not isinstance(trace["out"], dict):
49 1
        return False
50 1
    uni_vlan = uni.user_tag.value if uni.user_tag else None
51 1
    return (
52
        uni.interface.port_number == trace["out"].get("port")
53
        and uni_vlan == trace["out"].get("vlan")
54
    )
55
56
57 1
def uni_to_str(uni):
58
    """Create a string representation of the uni: intf_id:portno[:vlan]."""
59 1
    dpid = uni.interface.switch.dpid
60 1
    port = uni.interface.port_number
61 1
    uni_str = str(dpid) + ':' + str(port)
62 1
    if uni.user_tag:
63 1
        uni_str += ':' + str(uni.user_tag.value)
64 1
    return uni_str
65
66
67 1
def load_spec():
68
    """Validate openapi spec."""
69 1
    napp_dir = Path(__file__).parent
70 1
    yml_file = napp_dir / "openapi.yml"
71 1
    spec_dict, _ = read_from_filename(yml_file)
72
73 1
    validate_spec(spec_dict)
74
75 1
    return create_spec(spec_dict)
76
77
78 1
def validate(spec):
79
    """Decorator to validate a REST endpoint input.
80
81
    Uses the schema defined in the openapi.yml file
82
    to validate.
83
    """
84
85 1
    def validate_decorator(func):
86 1
        @functools.wraps(func)
87 1
        def wrapper_validate(*args, **kwargs):
88 1
            try:
89 1
                data = request.get_json()
90 1
            except BadRequest:
91 1
                result = "The request body is not a well-formed JSON."
92 1
                log.debug("create_circuit result %s %s", result, 400)
93 1
                raise BadRequest(result) from BadRequest
94 1
            if data is None:
95 1
                result = "The request body mimetype is not application/json."
96 1
                log.debug("update result %s %s", result, 415)
97 1
                raise UnsupportedMediaType(result)
98
99 1
            validator = RequestValidator(spec)
100 1
            openapi_request = FlaskOpenAPIRequest(request)
101 1
            result = validator.validate(openapi_request)
102 1
            if result.errors:
103 1
                error_response = (
104
                    "The request body contains invalid API data."
105
                )
106 1
                errors = result.errors[0]
107 1
                if hasattr(errors, "schema_errors"):
108 1
                    schema_errors = errors.schema_errors[0]
109 1
                    error_log = {
110
                        "error_message": schema_errors.message,
111
                        "error_validator": schema_errors.validator,
112
                        "error_validator_value": schema_errors.validator_value,
113
                        "error_path": list(schema_errors.path),
114
                        "error_schema": schema_errors.schema,
115
                        "error_schema_path": list(schema_errors.schema_path),
116
                    }
117 1
                    log.debug("Invalid request (API schema): %s", error_log)
118 1
                    error_response += f" {schema_errors.message} for field"
119 1
                    error_response += (
120
                        f" {'/'.join(map(str,schema_errors.path))}."
121
                    )
122 1
                raise BadRequest(error_response) from BadRequest
123 1
            return func(*args, data=data, **kwargs)
124
125 1
        return wrapper_validate
126
127
    return validate_decorator
128