1
|
|
|
"""Automate circuit traces.""" |
2
|
|
|
|
3
|
1 |
|
import time |
4
|
|
|
|
5
|
1 |
|
import requests |
6
|
1 |
|
from kytos.core import KytosEvent, log |
7
|
1 |
|
from napps.amlight.sdntrace_cp import settings |
8
|
1 |
|
from napps.amlight.sdntrace_cp.scheduler import Scheduler |
9
|
1 |
|
from napps.amlight.sdntrace_cp.utils import (clean_circuits, format_result, |
10
|
|
|
get_stored_flows) |
11
|
1 |
|
from pyof.v0x04.common.port import PortNo as Port13 |
12
|
1 |
|
from requests.exceptions import ConnectTimeout |
13
|
|
|
|
14
|
|
|
|
15
|
1 |
|
class Automate: |
16
|
|
|
"""Find all circuits and automate trace execution.""" |
17
|
|
|
|
18
|
1 |
|
def __init__(self, tracer): |
19
|
1 |
|
self._tracer = tracer |
20
|
1 |
|
self.ids = set() |
21
|
1 |
|
self.scheduler = Scheduler() |
22
|
|
|
|
23
|
|
|
# pylint: disable=too-many-nested-blocks, too-many-branches |
24
|
1 |
|
def find_circuits(self): |
25
|
|
|
"""Discover all circuits in a topology. |
26
|
|
|
|
27
|
|
|
Using the list of stored flows from flow_manager, run control plane |
28
|
|
|
traces to find a list of circuits.""" |
29
|
1 |
|
all_flows = {} |
30
|
1 |
|
circuits = [] |
31
|
1 |
|
stored_flows = get_stored_flows() |
32
|
1 |
|
for switch in self._tracer.controller.switches.copy().values(): |
33
|
1 |
|
all_flows[switch] = [] |
34
|
1 |
|
if switch.ofp_version == '0x04': |
35
|
1 |
|
controller_port = Port13.OFPP_CONTROLLER |
36
|
|
|
|
37
|
1 |
|
try: |
38
|
1 |
|
for flow in stored_flows[switch.dpid]: |
39
|
1 |
|
flow = flow['flow'] |
40
|
1 |
|
if 'match' not in flow: |
41
|
|
|
continue |
42
|
1 |
|
action_ok = False |
43
|
1 |
|
in_port_ok = False |
44
|
1 |
|
if 'in_port' in flow['match'] \ |
45
|
|
|
and flow['match']['in_port'] != 0: |
46
|
1 |
|
in_port_ok = True |
47
|
1 |
|
if in_port_ok: |
48
|
1 |
|
for action in flow['actions']: |
49
|
1 |
|
if action['action_type'] == 'output' \ |
50
|
|
|
and action['port'] != controller_port: |
|
|
|
|
51
|
1 |
|
action_ok = True |
52
|
1 |
|
if action_ok: |
53
|
1 |
|
all_flows[switch].append(flow) |
54
|
|
|
except AttributeError: |
55
|
|
|
pass |
56
|
1 |
|
for switch, flows in all_flows.items(): |
57
|
1 |
|
for flow in flows: |
58
|
1 |
|
in_port = flow['match']['in_port'] |
59
|
1 |
|
vlan = None |
60
|
1 |
|
if 'dl_vlan' in flow['match']: |
61
|
|
|
vlan = flow['match']['dl_vlan'] |
62
|
1 |
|
entries = { |
63
|
|
|
'trace': { |
64
|
|
|
'switch': { |
65
|
|
|
'dpid': switch.dpid, |
66
|
|
|
'in_port': in_port |
67
|
|
|
}, |
68
|
|
|
'eth': { |
69
|
|
|
'dl_vlan': vlan |
70
|
|
|
} |
71
|
|
|
} |
72
|
|
|
} |
73
|
1 |
|
result = self._tracer.tracepath(entries) |
74
|
1 |
|
circuits.append({'circuit': format_result(result), |
75
|
|
|
'entries': entries}) |
76
|
1 |
|
return clean_circuits(circuits, self._tracer.controller) |
77
|
|
|
|
78
|
1 |
|
def run_traces(self): |
79
|
|
|
"""Run traces for all circuits.""" |
80
|
|
|
|
81
|
1 |
|
results = [] |
82
|
1 |
|
circuits = self.find_circuits() |
83
|
1 |
|
for circuit in circuits: |
84
|
1 |
|
entries = circuit['entries'] |
85
|
1 |
|
result = self._tracer.tracepath(entries) |
86
|
1 |
|
try: |
87
|
1 |
|
result = format_result(result) |
88
|
1 |
|
if result != circuit['circuit']: |
89
|
1 |
|
results.append(circuit) |
90
|
|
|
except KeyError: |
91
|
|
|
results.append(circuit) |
92
|
1 |
|
log.debug('Results %s, size %s', results, len(circuits)) |
93
|
1 |
|
return results |
94
|
|
|
|
95
|
1 |
|
def get_circuit(self, circuit): |
96
|
|
|
"""Find the given circuit in the list of circuits.""" |
97
|
1 |
|
circuits = self.find_circuits() |
98
|
1 |
|
for steps in circuits: |
99
|
1 |
|
endpoint_a = steps['circuit'][0] |
100
|
1 |
|
endpoint_z = steps['circuit'][-1] |
101
|
|
|
# pylint: disable=too-many-boolean-expressions |
102
|
1 |
|
if (circuit['dpid_a'] == endpoint_a['dpid'] and |
103
|
|
|
circuit['port_a'] == endpoint_a['in_port'] and |
104
|
|
|
circuit['vlan_a'] == endpoint_a['in_vlan'] and |
105
|
|
|
circuit['dpid_z'] == endpoint_z['dpid'] and |
106
|
|
|
circuit['port_z'] == endpoint_z['out_port'] and |
107
|
|
|
circuit['vlan_z'] == endpoint_z['out_vlan']): |
108
|
|
|
|
109
|
1 |
|
return steps['circuit'] |
110
|
1 |
|
return None |
111
|
|
|
|
112
|
1 |
|
def _check_trace(self, circuit, trace): |
113
|
1 |
|
steps = self.get_circuit(circuit) |
114
|
1 |
|
if steps: |
115
|
1 |
|
if len(steps) != len(trace) - 1: |
116
|
1 |
|
return False |
117
|
1 |
|
for i, step in enumerate(steps): |
118
|
1 |
|
if not self.check_step(step, trace[i]): |
119
|
1 |
|
return False |
120
|
|
|
else: |
121
|
1 |
|
return False |
122
|
1 |
|
return True |
123
|
|
|
|
124
|
1 |
|
def run_important_traces(self): |
125
|
|
|
"""Run SDNTrace in data plane for important circuits as defined |
126
|
|
|
by user.""" |
127
|
1 |
|
event = KytosEvent(name='amlight/kytos_courier.slack_send') |
128
|
1 |
|
content = { |
129
|
|
|
'channel': settings.SLACK_CHANNEL, |
130
|
|
|
'source': 'amlight/sdntrace_cp' |
131
|
|
|
} |
132
|
|
|
|
133
|
1 |
|
try: |
134
|
1 |
|
important_circuits = settings.IMPORTANT_CIRCUITS |
135
|
1 |
|
except AttributeError: |
136
|
1 |
|
return |
137
|
|
|
|
138
|
1 |
|
for circuit in important_circuits: |
139
|
1 |
|
entries = { |
140
|
|
|
'trace': { |
141
|
|
|
'switch': { |
142
|
|
|
'dpid': circuit['dpid_a'], |
143
|
|
|
'in_port': circuit['port_a'] |
144
|
|
|
}, |
145
|
|
|
'eth': { |
146
|
|
|
'dl_vlan': circuit['vlan_a'] |
147
|
|
|
} |
148
|
|
|
} |
149
|
|
|
} |
150
|
1 |
|
try: |
151
|
1 |
|
result = requests.put( |
152
|
|
|
settings.SDNTRACE_URL, |
153
|
|
|
json=entries, |
154
|
|
|
timeout=30) |
155
|
|
|
except ConnectTimeout as exception: |
156
|
|
|
log.error(f"Request has timed out: {exception}") |
157
|
1 |
|
trace = result.json() |
158
|
1 |
|
trace_id = trace['result']['trace_id'] |
159
|
1 |
|
step_type = None |
160
|
1 |
|
while step_type != 'last': |
161
|
1 |
|
time.sleep(5) |
162
|
1 |
|
try: |
163
|
1 |
|
result = requests.get( |
164
|
|
|
f'{settings.SDNTRACE_URL}/{trace_id}', |
165
|
|
|
timeout=30) |
166
|
|
|
except ConnectTimeout as exception: |
167
|
|
|
log.error(f"Request has timed out: {exception}") |
168
|
1 |
|
trace = result.json() |
169
|
1 |
|
step_type = trace['result'][-1]['type'] |
170
|
1 |
|
check = self._check_trace(circuit, trace['result']) |
171
|
1 |
|
if check is False: |
172
|
1 |
|
content['m_body'] = 'Trace in data plane different from ' \ |
173
|
|
|
'trace in control plane for circuit ' \ |
174
|
|
|
f'{circuit}' |
175
|
1 |
|
event.content['message'] = content |
176
|
1 |
|
self._tracer.controller.buffers.app.put(event) |
177
|
|
|
|
178
|
1 |
|
def schedule_id(self, id_): |
179
|
|
|
"""Keep track of scheduled ids""" |
180
|
1 |
|
if not isinstance(id_, str): |
181
|
1 |
|
raise AttributeError("Invalid id type, for schedule.") |
182
|
1 |
|
self.ids.add(id_) |
183
|
1 |
|
return id_ |
184
|
|
|
|
185
|
1 |
|
def schedule_traces(self, settings_=settings): |
186
|
|
|
"""Check for invalid arguments from schedule""" |
187
|
1 |
|
if settings_.TRIGGER_SCHEDULE_TRACES: |
188
|
1 |
|
id_ = self.schedule_id('automatic_traces') |
189
|
1 |
|
trigger = settings_.SCHEDULE_TRIGGER |
190
|
1 |
|
kwargs = settings_.SCHEDULE_ARGS |
191
|
1 |
|
if (not isinstance(kwargs, dict) or |
192
|
|
|
not isinstance(trigger, str)): |
193
|
1 |
|
raise AttributeError("Invalid attributes for " |
194
|
|
|
"job to be scheduled.") |
195
|
1 |
|
trigger_args = {} |
196
|
1 |
|
trigger_args['kwargs'] = {'trigger': trigger} |
197
|
1 |
|
trigger_args['kwargs'].update(kwargs) |
198
|
1 |
|
self.scheduler.add_callable(id_, self.run_traces, |
199
|
|
|
**trigger_args['kwargs']) |
200
|
1 |
|
return self.scheduler.get_job(id_) |
201
|
|
|
return None |
202
|
|
|
|
203
|
1 |
|
def schedule_important_traces(self, settings_=settings): |
204
|
|
|
"""Check for invalid important arguments from schedule""" |
205
|
1 |
|
if settings_.TRIGGER_IMPORTANT_CIRCUITS: |
206
|
1 |
|
id_ = self.schedule_id('automatic_important_traces') |
207
|
1 |
|
trigger = settings_.IMPORTANT_CIRCUITS_TRIGGER |
208
|
1 |
|
kwargs = settings_.IMPORTANT_CIRCUITS_ARGS |
209
|
1 |
|
if (not isinstance(kwargs, dict) or |
210
|
|
|
not isinstance(trigger, str)): |
211
|
1 |
|
raise AttributeError("Invalid attributes for " |
212
|
|
|
"job to be scheduled.") |
213
|
1 |
|
trigger_args = {} |
214
|
1 |
|
trigger_args['kwargs'] = {'trigger': trigger} |
215
|
1 |
|
trigger_args['kwargs'].update(kwargs) |
216
|
1 |
|
return self.scheduler.add_callable(id_, |
217
|
|
|
self.run_important_traces, |
218
|
|
|
**trigger_args['kwargs']) |
219
|
|
|
return None |
220
|
|
|
|
221
|
1 |
|
def unschedule_ids(self, id_set=None): |
222
|
|
|
"""Remove ids to be unschedule""" |
223
|
1 |
|
if id_set is None: |
224
|
1 |
|
id_set = self.ids.copy() |
225
|
1 |
|
while id_set: |
226
|
1 |
|
id_ = id_set.pop() |
227
|
1 |
|
self.ids.discard(id_) |
228
|
1 |
|
self.scheduler.remove_job(id_) |
229
|
|
|
|
230
|
1 |
|
def sheduler_shutdown(self, wait): |
231
|
|
|
"""Shutdown scheduler""" |
232
|
|
|
self.scheduler.shutdown(wait) |
233
|
|
|
|
234
|
1 |
|
@staticmethod |
235
|
1 |
|
def check_step(circuit_step, trace_step): |
236
|
|
|
"""Check if a step in SDNTrace in data plane is what it should""" |
237
|
1 |
|
return (circuit_step['dpid'] == trace_step['dpid'] and |
238
|
|
|
circuit_step['in_port'] == trace_step['port']) |
239
|
|
|
|