|
1
|
|
|
"""Utility functions.""" |
|
2
|
1 |
|
import functools |
|
3
|
1 |
|
from pathlib import Path |
|
4
|
|
|
|
|
5
|
1 |
|
from flask import request |
|
6
|
1 |
|
from openapi_core import create_spec |
|
7
|
1 |
|
from openapi_core.contrib.flask import FlaskOpenAPIRequest |
|
8
|
1 |
|
from openapi_core.validation.request.validators import RequestValidator |
|
9
|
1 |
|
from openapi_spec_validator import validate_spec |
|
10
|
1 |
|
from openapi_spec_validator.readers import read_from_filename |
|
11
|
1 |
|
from werkzeug.exceptions import BadRequest, UnsupportedMediaType |
|
12
|
|
|
|
|
13
|
1 |
|
from kytos.core import log |
|
14
|
1 |
|
from kytos.core.events import KytosEvent |
|
15
|
|
|
|
|
16
|
|
|
|
|
17
|
1 |
|
def map_evc_event_content(evc): |
|
18
|
|
|
"""Returns a set of values from evc to be used for content""" |
|
19
|
1 |
|
return {"evc_id": evc.id, |
|
20
|
|
|
"name": evc.name, |
|
21
|
|
|
"metadata": evc.metadata, |
|
22
|
|
|
"active": evc._active, |
|
23
|
|
|
"enabled": evc._enabled, |
|
24
|
|
|
"uni_a": evc.uni_a.as_dict(), |
|
25
|
|
|
"uni_z": evc.uni_z.as_dict()} |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
1 |
|
def emit_event(controller, name, context="kytos/mef_eline", content=None): |
|
29
|
|
|
"""Send an event when something happens with an EVC.""" |
|
30
|
1 |
|
event_name = f"{context}.{name}" |
|
31
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
|
32
|
1 |
|
controller.buffers.app.put(event) |
|
33
|
|
|
|
|
34
|
|
|
|
|
35
|
1 |
|
def notify_link_available_tags(controller, link, src_func=None): |
|
36
|
|
|
"""Notify link available tags.""" |
|
37
|
1 |
|
emit_event(controller, "link_available_tags", content={ |
|
38
|
|
|
"link": link, |
|
39
|
|
|
"src_func": src_func |
|
40
|
|
|
}) |
|
41
|
|
|
|
|
42
|
|
|
|
|
43
|
1 |
|
def compare_endpoint_trace(endpoint, vlan, trace): |
|
44
|
|
|
"""Compare and endpoint with a trace step.""" |
|
45
|
1 |
|
if vlan and "vlan" in trace: |
|
46
|
1 |
|
return ( |
|
47
|
|
|
endpoint.switch.dpid == trace["dpid"] |
|
48
|
|
|
and endpoint.port_number == trace["port"] |
|
49
|
|
|
and vlan == trace["vlan"] |
|
50
|
|
|
) |
|
51
|
1 |
|
return ( |
|
52
|
|
|
endpoint.switch.dpid == trace["dpid"] |
|
53
|
|
|
and endpoint.port_number == trace["port"] |
|
54
|
|
|
) |
|
55
|
|
|
|
|
56
|
|
|
|
|
57
|
1 |
|
def compare_uni_out_trace(uni, trace): |
|
58
|
|
|
"""Check if the trace last step (output) matches the UNI attributes.""" |
|
59
|
|
|
# keep compatibility for old versions of sdntrace-cp |
|
60
|
1 |
|
if "out" not in trace: |
|
61
|
1 |
|
return True |
|
62
|
1 |
|
if not isinstance(trace["out"], dict): |
|
63
|
1 |
|
return False |
|
64
|
1 |
|
uni_vlan = uni.user_tag.value if uni.user_tag else None |
|
65
|
1 |
|
return ( |
|
66
|
|
|
uni.interface.port_number == trace["out"].get("port") |
|
67
|
|
|
and uni_vlan == trace["out"].get("vlan") |
|
68
|
|
|
) |
|
69
|
|
|
|
|
70
|
|
|
|
|
71
|
1 |
|
def uni_to_str(uni): |
|
72
|
|
|
"""Create a string representation of the uni: intf_id:portno[:vlan].""" |
|
73
|
1 |
|
dpid = uni.interface.switch.dpid |
|
74
|
1 |
|
port = uni.interface.port_number |
|
75
|
1 |
|
uni_str = str(dpid) + ':' + str(port) |
|
76
|
1 |
|
if uni.user_tag: |
|
77
|
1 |
|
uni_str += ':' + str(uni.user_tag.value) |
|
78
|
1 |
|
return uni_str |
|
79
|
|
|
|
|
80
|
|
|
|
|
81
|
1 |
|
def load_spec(): |
|
82
|
|
|
"""Validate openapi spec.""" |
|
83
|
1 |
|
napp_dir = Path(__file__).parent |
|
84
|
1 |
|
yml_file = napp_dir / "openapi.yml" |
|
85
|
1 |
|
spec_dict, _ = read_from_filename(yml_file) |
|
86
|
|
|
|
|
87
|
1 |
|
validate_spec(spec_dict) |
|
88
|
|
|
|
|
89
|
1 |
|
return create_spec(spec_dict) |
|
90
|
|
|
|
|
91
|
|
|
|
|
92
|
1 |
|
def validate(spec): |
|
93
|
|
|
"""Decorator to validate a REST endpoint input. |
|
94
|
|
|
|
|
95
|
|
|
Uses the schema defined in the openapi.yml file |
|
96
|
|
|
to validate. |
|
97
|
|
|
""" |
|
98
|
|
|
|
|
99
|
1 |
|
def validate_decorator(func): |
|
100
|
1 |
|
@functools.wraps(func) |
|
101
|
1 |
|
def wrapper_validate(*args, **kwargs): |
|
102
|
1 |
|
try: |
|
103
|
1 |
|
data = request.get_json() |
|
104
|
1 |
|
except BadRequest: |
|
105
|
1 |
|
result = "The request body is not a well-formed JSON." |
|
106
|
1 |
|
log.debug("create_circuit result %s %s", result, 400) |
|
107
|
1 |
|
raise BadRequest(result) from BadRequest |
|
108
|
1 |
|
if data is None: |
|
109
|
1 |
|
result = "The request body mimetype is not application/json." |
|
110
|
1 |
|
log.debug("update result %s %s", result, 415) |
|
111
|
1 |
|
raise UnsupportedMediaType(result) |
|
112
|
|
|
|
|
113
|
1 |
|
validator = RequestValidator(spec) |
|
114
|
1 |
|
openapi_request = FlaskOpenAPIRequest(request) |
|
115
|
1 |
|
result = validator.validate(openapi_request) |
|
116
|
1 |
|
if result.errors: |
|
117
|
1 |
|
error_response = ( |
|
118
|
|
|
"The request body contains invalid API data." |
|
119
|
|
|
) |
|
120
|
1 |
|
errors = result.errors[0] |
|
121
|
1 |
|
if hasattr(errors, "schema_errors"): |
|
122
|
1 |
|
schema_errors = errors.schema_errors[0] |
|
123
|
1 |
|
error_log = { |
|
124
|
|
|
"error_message": schema_errors.message, |
|
125
|
|
|
"error_validator": schema_errors.validator, |
|
126
|
|
|
"error_validator_value": schema_errors.validator_value, |
|
127
|
|
|
"error_path": list(schema_errors.path), |
|
128
|
|
|
"error_schema": schema_errors.schema, |
|
129
|
|
|
"error_schema_path": list(schema_errors.schema_path), |
|
130
|
|
|
} |
|
131
|
1 |
|
log.debug("Invalid request (API schema): %s", error_log) |
|
132
|
1 |
|
error_response += f" {schema_errors.message} for field" |
|
133
|
1 |
|
error_response += ( |
|
134
|
|
|
f" {'/'.join(map(str,schema_errors.path))}." |
|
135
|
|
|
) |
|
136
|
1 |
|
raise BadRequest(error_response) from BadRequest |
|
137
|
1 |
|
return func(*args, data=data, **kwargs) |
|
138
|
|
|
|
|
139
|
1 |
|
return wrapper_validate |
|
140
|
|
|
|
|
141
|
|
|
return validate_decorator |
|
142
|
|
|
|