Test Failed
Pull Request — master (#114)
by Aldo
03:48
created

build.tracing.trace_manager   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 331
Duplicated Lines 0 %

Test Coverage

Coverage 90.4%

Importance

Changes 0
Metric Value
eloc 142
dl 0
loc 331
ccs 113
cts 125
cp 0.904
rs 9.28
c 0
b 0
f 0
wmc 39

20 Methods

Rating   Name   Duplication   Size   Complexity  
B TraceManager._run_traces() 0 32 8
A TraceManager.number_pending_requests() 0 7 1
A TraceManager.queue_probe_packet() 0 21 1
A TraceManager.__init__() 0 28 1
A TraceManager.rest_list_results() 0 7 1
A TraceManager.get_id() 0 9 1
A TraceManager.rest_new_trace() 0 22 3
A TraceManager.is_entry_valid() 0 28 4
A TraceManager.limit_traces_reached() 0 12 2
A TraceManager.get_results() 0 8 1
A TraceManager.add_result() 0 8 1
A TraceManager.is_tracing_running() 0 2 1
A TraceManager.rest_get_result() 0 7 1
A TraceManager.get_result() 0 16 4
A TraceManager.new_trace() 0 18 1
A TraceManager.rest_list_stats() 0 17 1
A TraceManager.run_traces() 0 9 1
A TraceManager.stop_traces() 0 5 2
A TraceManager._spawn_trace() 0 14 1
A TraceManager.avoid_duplicated_request() 0 14 3
1
"""
2
    Trace Manager Class
3
"""
4
5
6 1
import time
7 1
import dill
8 1
import asyncio
9
from _thread import start_new_thread as new_thread
10 1
11 1
from kytos.core import log
12 1
from napps.amlight.sdntrace import settings
13 1
from napps.amlight.sdntrace.shared.switches import Switches
14 1
from napps.amlight.sdntrace.shared.colors import Colors
15 1
from napps.amlight.sdntrace.tracing.tracer import TracePath
16 1
from napps.amlight.sdntrace.tracing.trace_pkt import process_packet
17
from napps.amlight.sdntrace.tracing.trace_entries import TraceEntries
18
19 1
20
class TraceManager(object):
21
    """
22
        The TraceManager class is the class responsible to
23
        manage all trace requests.
24
    """
25 1
26
    def __init__(self, controller):
27
        """Initialization of the TraceManager class
28
        Args:
29
             controller = Kytos.core.controller object
30
        """
31 1
        # Controller
32
        self.controller = controller
33
34 1
        # Trace ID used to distinguish each trace
35
        self._id = 30000
36
37 1
        # Trace queues
38 1
        self._request_queue = dict()
39 1
        self._results_queue = dict()
40
        self._running_traces = dict()
41
42 1
        # Counters
43
        self._total_traces_requested = 0
44
45 1
        # PacketIn queue with Probes
46
        self.trace_pkt_in = []
47 1
48
        self._is_tracing_running = True
49
50 1
        self.tasks: list[asyncio.Task] = []
51
        self._async_loop = None
52 1
        # To start traces
53 1
        self.run_traces(settings.TRACE_INTERVAL)        
54
55 1
    async def stop_traces(self):
56
        self._is_tracing_running = False
57
        for task in self.tasks:
58 1
            task.cancel()
59
            await task
60
61
    def is_tracing_running(self):
62
        return self._is_tracing_running
63
    
64
    def run_traces(self, trace_interval):
65 1
        """
66 1
        Create the task to search for traces _run_traces.
67 1
        """
68 1
        self._async_loop = asyncio.get_running_loop()
69 1
        task = self._async_loop.create_task(
70 1
            self._run_traces(trace_interval)
71 1
        )
72 1
        self.tasks.append(task)
73 1
74 1
    async def _run_traces(self, trace_interval):
75
        """ Thread that will keep reading the self._request_queue
76
        queue looking for new trace requests to run.
77
78
        Args:
79 1
            trace_interval = sleeping time
80 1
        """
81
        try:
82
            while self.is_tracing_running():
83 1
                if self.number_pending_requests() > 0:
84
                    try:
85 1
                        new_request_ids = []
86
                        for req_id in self._request_queue.copy():
87
                            if not self.limit_traces_reached():
88
                                entries = self._request_queue[req_id]
89
                                self._running_traces[req_id] = entries
90
                                task = self._async_loop.create_task(
91
                                    self._spawn_trace(req_id, entries)
92
                                )
93 1
                                self.tasks.append(task)
94 1
                                new_request_ids.append(req_id)
95 1
                            else:
96
                                break
97 1
                        # After starting traces for new requests,
98
                        # remove them from self._request_queue
99 1
                        for rid in new_request_ids:
100
                            del self._request_queue[rid]
101
                    except Exception as error:  # pylint: disable=broad-except
102
                        log.error("Trace Error: %s" % error)
103
                await asyncio.sleep(trace_interval)
104
        except asyncio.CancelledError as err:
105
            log.warning("sdntrace is cancelling trace loopk up.")
106 1
107
    async def _spawn_trace(self, trace_id, trace_entries):
108 1
        """ Once a request is found by the run_traces method,
109
        instantiate a TracePath class and run the tracepath
110
111
        Args:
112
            trace_id: trace request id
113
            trace_entries: TraceEntries class
114
        """
115
        
116
        log.info("Creating task to trace request id %s..." % trace_id)
117
        tracer = TracePath(self, trace_id, trace_entries)
118 1
        await tracer.tracepath()
119 1
120 1
        del self._running_traces[trace_id]
121 1
122
    def add_result(self, trace_id, result):
123 1
        """Used to save trace results to self._results_queue
124 1
125
        Args:
126
            trace_id: trace ID
127
            result: trace result generated using tracer
128
        """
129
        self._results_queue[trace_id] = result
130
131
    def avoid_duplicated_request(self, entries):
132
        """Verify if any of the requested queries has the same entries.
133
        If so, ignore it
134 1
135 1
        Args:
136 1
            entries: entries provided by user via REST.
137 1
        Return:
138 1
            True: if exists a similar request
139
            False: otherwise
140 1
        """
141 1
        for request in self._request_queue.copy():
142 1
            if entries == self._request_queue[request]:
143 1
                return True
144
        return False
145 1
146 1
    @staticmethod
147
    def is_entry_valid(entries):
148
        """ This method validates all params provided, including
149
        if the switch/dpid requested exists.
150 1
151
        Args:
152 1
            entries: dictionary with user request
153
        Returns:
154
            TraceEntries class
155
            Error msg
156
        """
157
        try:
158
            trace_entries = TraceEntries()
159 1
            trace_entries.load_entries(entries)
160 1
        except ValueError as msg:
161
            return str(msg)
162 1
163
        init_switch = Switches().get_switch(trace_entries.dpid)
164
        if isinstance(init_switch, bool):
165
            return "Unknown Switch"
166
        color = Colors().get_switch_color(init_switch.dpid)
167
168
        if len(color) == 0:
169 1
            return "Switch not Colored"
170 1
171 1
        # TODO: get Coloring API to confirm color_field
172 1
173 1
        return trace_entries
174 1
175 1
    def get_id(self):
176 1
        """ID generator for each trace. Useful in case
177 1
        of parallel requests
178
179 1
        Returns:
180
            integer to be the new request/trace id
181
        """
182
        self._id += 1
183
        return self._id
184
185
    def get_result(self, trace_id):
186
        """Used by external apps to get a trace result using the trace ID
187
188 1
        Returns:
189
            result from self._results_queue
190
            msg depending of the status (unknown, pending, or active)
191
        """
192
        trace_id = int(trace_id)
193
        try:
194
            return self._results_queue[trace_id]
195
        except (ValueError, KeyError):
196
            if trace_id in self._running_traces:
197 1
                return {'msg': 'trace in process'}
198 1
            elif trace_id in self._request_queue:
199 1
                return {'msg': 'trace pending'}
200
            return {'msg': 'unknown trace id'}
201 1
202
    def get_results(self):
203
        """Used by external apps to get all trace results. Useful
204
        to see all requests and results
205
206
        Returns:
207
            list of results
208
        """
209
        return self._results_queue
210 1
211
    def limit_traces_reached(self):
212
        """ Control the number of active traces running in parallel. Protects the
213 1
        switches and avoid DoS.
214
215
        Returns:
216 1
            True: if the number of traces running is equal/more
217
                than settings.PARALLEL_TRACES
218 1
            False: if it is not.
219
        """
220 1
        if len(self._running_traces) >= settings.PARALLEL_TRACES:
221
            return True
222
        return False
223
224
    def new_trace(self, trace_entries):
225
        """Receives external requests for traces.
226 1
227
        Args:
228 1
            trace_entries: TraceEntries Class
229
        Returns:
230
            int with the request/trace id
231
        """
232
233
        trace_id = self.get_id()
234
235
        # Add to request_queue
236
        self._request_queue[trace_id] = trace_entries
237
238
        # Statistics
239
        self._total_traces_requested += 1
240
241
        return trace_id
242
243
    def number_pending_requests(self):
244
        """Used to check if there are entries to be traced
245
246
        Returns:
247
            length of self._request_queue
248
        """
249
        return len(self._request_queue)
250
251
    def queue_probe_packet(self, event, ethernet, in_port, switch):
252 1
        """Used by sdntrace.packet_in_handler. Only tracing probes
253
        get to this point. Adds the PacketIn msg received to the
254
        trace_pkt_in queue.
255
256
        Args:
257
            event: PacketIn msg
258
            ethernet: ethernet frame
259
            in_port: in_port
260
            switch: kytos.core.switch.Switch() class
261 1
        """
262 1
        pkt_in = dict()
263 1
264 1
        pkt_in["dpid"] = switch.dpid
265 1
        pkt_in["in_port"] = in_port
266
        pkt_in["msg"] = dill.loads(process_packet(ethernet))
267 1
        pkt_in["ethernet"] = ethernet
268 1
        pkt_in["event"] = event
269 1
270
        # This queue stores all PacketIn message received
271 1
        self.trace_pkt_in.append(pkt_in)
272 1
273 1
    # REST calls
274
275 1
    def rest_new_trace(self, entries):
276
        """Used for the REST PUT call
277
278
        Args:
279
            entries: user provided parameters to trace
280
        Returns:
281 1
            Trace_ID in JSON format
282
            Error msg if entries has invalid data
283 1
        """
284
        result = dict()
285
        trace_entries = self.is_entry_valid(entries)
286
        if not isinstance(trace_entries, TraceEntries):
287
            result['result'] = {'error': trace_entries}
288
            return result
289 1
290
        if self.avoid_duplicated_request(entries):
291 1
            result['result'] = {'error': "Duplicated Trace Request ignored"}
292
            return result
293
294
        trace_id = self.new_trace(trace_entries)
295
        result['result'] = {'trace_id': trace_id}
296
        return result
297
298
    def rest_get_result(self, trace_id):
299
        """Used for the REST GET call
300
301 1
        Returns:
302 1
            get_result in JSON format
303 1
        """
304 1
        return self.get_result(trace_id)
305 1
306
    def rest_list_results(self):
307 1
        """Used for the REST GET call
308
309
        Returns:
310
            get_results in JSON format
311
        """
312
        return self.get_results()
313
314
    def rest_list_stats(self):
315
        """ Used to export some info about the TraceManager.
316
        Total number of requests, number of active traces, number of
317
        pending traces, list of traces pending
318
        Returns:
319
                Total number of requests
320
                number of active traces
321
                number of pending traces
322
                list of traces pending
323
        """
324
        stats = dict()
325
        stats['number_of_requests'] = self._total_traces_requested
326
        stats['number_of_running_traces'] = len(self._running_traces)
327
        stats['number_of_pending_traces'] = len(self._request_queue)
328
        stats['list_of_pending_traces'] = self._results_queue
329
330
        return stats
331