TraceManager._run_traces()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 26
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7.2694

Importance

Changes 0
Metric Value
cc 7
eloc 17
nop 2
dl 0
loc 26
ccs 14
cts 17
cp 0.8235
crap 7.2694
rs 8
c 0
b 0
f 0
1
"""
2
    Trace Manager Class
3
"""
4
5
6 1
import time
7 1
import dill
8 1
from _thread import start_new_thread as new_thread
9
10 1
from kytos.core import log
11 1
from napps.amlight.sdntrace import settings
12 1
from napps.amlight.sdntrace.shared.switches import Switches
13 1
from napps.amlight.sdntrace.shared.colors import Colors
14 1
from napps.amlight.sdntrace.tracing.tracer import TracePath
15 1
from napps.amlight.sdntrace.tracing.trace_pkt import process_packet
16 1
from napps.amlight.sdntrace.tracing.trace_entries import TraceEntries
17
18
19 1
class TraceManager(object):
20
    """
21
        The TraceManager class is the class responsible to
22
        manage all trace requests.
23
    """
24
25 1
    def __init__(self, controller):
26
        """Initialization of the TraceManager class
27
        Args:
28
             controller = Kytos.core.controller object
29
        """
30
        # Controller
31 1
        self.controller = controller
32
33
        # Trace ID used to distinguish each trace
34 1
        self._id = 30000
35
36
        # Trace queues
37 1
        self._request_queue = dict()
38 1
        self._results_queue = dict()
39 1
        self._running_traces = dict()
40
41
        # Counters
42 1
        self._total_traces_requested = 0
43
44
        # PacketIn queue with Probes
45 1
        self.trace_pkt_in = []
46
47 1
        self._is_tracing_running = True
48
49
        # Thread to start traces
50 1
        new_thread(self._run_traces, (settings.TRACE_INTERVAL,))
51
52 1
    def stop_traces(self):
53 1
        self._is_tracing_running = False
54
55 1
    def is_tracing_running(self):
56
        return self._is_tracing_running
57
58 1
    def _run_traces(self, trace_interval):
59
        """ Thread that will keep reading the self._request_queue
60
        queue looking for new trace requests to run.
61
62
        Args:
63
            trace_interval = sleeping time
64
        """
65 1
        while self.is_tracing_running():
66 1
            if self.number_pending_requests() > 0:
67 1
                try:
68 1
                    new_request_ids = []
69 1
                    for req_id in self._request_queue.copy():
70 1
                        if not self.limit_traces_reached():
71 1
                            entries = self._request_queue[req_id]
72 1
                            self._running_traces[req_id] = entries
73 1
                            new_thread(self._spawn_trace, (req_id, entries,))
74 1
                            new_request_ids.append(req_id)
75
                        else:
76
                            break
77
                    # After starting traces for new requests,
78
                    # remove them from self._request_queue
79 1
                    for rid in new_request_ids:
80 1
                        del self._request_queue[rid]
81
                except Exception as error:  # pylint: disable=broad-except
82
                    log.error("Trace Error: %s" % error)
83 1
            time.sleep(trace_interval)
84
85 1
    def _spawn_trace(self, trace_id, trace_entries):
86
        """ Once a request is found by the run_traces method,
87
        instantiate a TracePath class and run the tracepath
88
89
        Args:
90
            trace_id: trace request id
91
            trace_entries: TraceEntries class
92
        """
93 1
        log.info("Creating thread to trace request id %s..." % trace_id)
94 1
        tracer = TracePath(self, trace_id, trace_entries)
95 1
        tracer.tracepath()
96
97 1
        del self._running_traces[trace_id]
98
99 1
    def add_result(self, trace_id, result):
100
        """Used to save trace results to self._results_queue
101
102
        Args:
103
            trace_id: trace ID
104
            result: trace result generated using tracer
105
        """
106 1
        self._results_queue[trace_id] = result
107
108 1
    def avoid_duplicated_request(self, entries):
109
        """Verify if any of the requested queries has the same entries.
110
        If so, ignore it
111
112
        Args:
113
            entries: entries provided by user via REST.
114
        Return:
115
            True: if exists a similar request
116
            False: otherwise
117
        """
118 1
        for request in self._request_queue.copy():
119 1
            if entries == self._request_queue[request]:
120 1
                return True
121 1
        return False
122
123 1
    @staticmethod
124 1
    def is_entry_valid(entries):
125
        """ This method validates all params provided, including
126
        if the switch/dpid requested exists.
127
128
        Args:
129
            entries: dictionary with user request
130
        Returns:
131
            TraceEntries class
132
            Error msg
133
        """
134 1
        try:
135 1
            trace_entries = TraceEntries()
136 1
            trace_entries.load_entries(entries)
137 1
        except ValueError as msg:
138 1
            return str(msg)
139
140 1
        init_switch = Switches().get_switch(trace_entries.dpid)
141 1
        if isinstance(init_switch, bool):
142 1
            return "Unknown Switch"
143 1
        color = Colors().get_switch_color(init_switch.dpid)
144
145 1
        if len(color) == 0:
146 1
            return "Switch not Colored"
147
148
        # TODO: get Coloring API to confirm color_field
149
150 1
        return trace_entries
151
152 1
    def get_id(self):
153
        """ID generator for each trace. Useful in case
154
        of parallel requests
155
156
        Returns:
157
            integer to be the new request/trace id
158
        """
159 1
        self._id += 1
160 1
        return self._id
161
162 1
    def get_result(self, trace_id):
163
        """Used by external apps to get a trace result using the trace ID
164
165
        Returns:
166
            result from self._results_queue
167
            msg depending of the status (unknown, pending, or active)
168
        """
169 1
        trace_id = int(trace_id)
170 1
        try:
171 1
            return self._results_queue[trace_id]
172 1
        except (ValueError, KeyError):
173 1
            if trace_id in self._running_traces:
174 1
                return {'msg': 'trace in process'}
175 1
            elif trace_id in self._request_queue:
176 1
                return {'msg': 'trace pending'}
177 1
            return {'msg': 'unknown trace id'}
178
179 1
    def get_results(self):
180
        """Used by external apps to get all trace results. Useful
181
        to see all requests and results
182
183
        Returns:
184
            list of results
185
        """
186
        return self._results_queue
187
188 1
    def limit_traces_reached(self):
189
        """ Control the number of active traces running in parallel. Protects the
190
        switches and avoid DoS.
191
192
        Returns:
193
            True: if the number of traces running is equal/more
194
                than settings.PARALLEL_TRACES
195
            False: if it is not.
196
        """
197 1
        if len(self._running_traces) >= settings.PARALLEL_TRACES:
198 1
            return True
199 1
        return False
200
201 1
    def new_trace(self, trace_entries):
202
        """Receives external requests for traces.
203
204
        Args:
205
            trace_entries: TraceEntries Class
206
        Returns:
207
            int with the request/trace id
208
        """
209
210 1
        trace_id = self.get_id()
211
212
        # Add to request_queue
213 1
        self._request_queue[trace_id] = trace_entries
214
215
        # Statistics
216 1
        self._total_traces_requested += 1
217
218 1
        return trace_id
219
220 1
    def number_pending_requests(self):
221
        """Used to check if there are entries to be traced
222
223
        Returns:
224
            length of self._request_queue
225
        """
226 1
        return len(self._request_queue)
227
228 1
    def queue_probe_packet(self, event, ethernet, in_port, switch):
229
        """Used by sdntrace.packet_in_handler. Only tracing probes
230
        get to this point. Adds the PacketIn msg received to the
231
        trace_pkt_in queue.
232
233
        Args:
234
            event: PacketIn msg
235
            ethernet: ethernet frame
236
            in_port: in_port
237
            switch: kytos.core.switch.Switch() class
238
        """
239
        pkt_in = dict()
240
241
        pkt_in["dpid"] = switch.dpid
242
        pkt_in["in_port"] = in_port
243
        pkt_in["msg"] = dill.loads(process_packet(ethernet))
244
        pkt_in["ethernet"] = ethernet
245
        pkt_in["event"] = event
246
247
        # This queue stores all PacketIn message received
248
        self.trace_pkt_in.append(pkt_in)
249
250
    # REST calls
251
252 1
    def rest_new_trace(self, entries):
253
        """Used for the REST PUT call
254
255
        Args:
256
            entries: user provided parameters to trace
257
        Returns:
258
            Trace_ID in JSON format
259
            Error msg if entries has invalid data
260
        """
261 1
        result = dict()
262 1
        trace_entries = self.is_entry_valid(entries)
263 1
        if not isinstance(trace_entries, TraceEntries):
264 1
            result['result'] = {'error': trace_entries}
265 1
            return result
266
267 1
        if self.avoid_duplicated_request(entries):
268 1
            result['result'] = {'error': "Duplicated Trace Request ignored"}
269 1
            return result
270
271 1
        trace_id = self.new_trace(trace_entries)
272 1
        result['result'] = {'trace_id': trace_id}
273 1
        return result
274
275 1
    def rest_get_result(self, trace_id):
276
        """Used for the REST GET call
277
278
        Returns:
279
            get_result in JSON format
280
        """
281 1
        return self.get_result(trace_id)
282
283 1
    def rest_list_results(self):
284
        """Used for the REST GET call
285
286
        Returns:
287
            get_results in JSON format
288
        """
289 1
        return self.get_results()
290
291 1
    def rest_list_stats(self):
292
        """ Used to export some info about the TraceManager.
293
        Total number of requests, number of active traces, number of
294
        pending traces, list of traces pending
295
        Returns:
296
                Total number of requests
297
                number of active traces
298
                number of pending traces
299
                list of traces pending
300
        """
301 1
        stats = dict()
302 1
        stats['number_of_requests'] = self._total_traces_requested
303 1
        stats['number_of_running_traces'] = len(self._running_traces)
304 1
        stats['number_of_pending_traces'] = len(self._request_queue)
305 1
        stats['list_of_pending_traces'] = self._results_queue
306
307
        return stats
308