Passed
Pull Request — master (#114)
by Aldo
03:58
created

build.tracing.trace_manager   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 327
Duplicated Lines 0 %

Test Coverage

Coverage 89.63%

Importance

Changes 0
Metric Value
eloc 138
dl 0
loc 327
ccs 121
cts 135
cp 0.8963
rs 9.36
c 0
b 0
f 0
wmc 38

20 Methods

Rating   Name   Duplication   Size   Complexity  
B TraceManager._run_traces() 0 29 7
A TraceManager.number_pending_requests() 0 7 1
A TraceManager.queue_probe_packet() 0 21 1
A TraceManager.__init__() 0 28 1
A TraceManager.rest_list_results() 0 7 1
A TraceManager.get_id() 0 9 1
A TraceManager.rest_new_trace() 0 22 3
A TraceManager.is_entry_valid() 0 28 4
A TraceManager.limit_traces_reached() 0 12 2
A TraceManager.get_results() 0 8 1
A TraceManager.add_result() 0 8 1
A TraceManager.is_tracing_running() 0 2 1
A TraceManager.rest_get_result() 0 7 1
A TraceManager.get_result() 0 16 4
A TraceManager.new_trace() 0 18 1
A TraceManager.rest_list_stats() 0 17 1
A TraceManager.run_traces() 0 9 1
A TraceManager.stop_traces() 0 4 2
A TraceManager._spawn_trace() 0 14 1
A TraceManager.avoid_duplicated_request() 0 14 3
1
"""
2
    Trace Manager Class
3
"""
4
5
6 1
import time
7 1
import dill
8 1
import asyncio
9 1
from _thread import start_new_thread as new_thread
10
11 1
from kytos.core import log
12 1
from napps.amlight.sdntrace import settings
13 1
from napps.amlight.sdntrace.shared.switches import Switches
14 1
from napps.amlight.sdntrace.shared.colors import Colors
15 1
from napps.amlight.sdntrace.tracing.tracer import TracePath
16 1
from napps.amlight.sdntrace.tracing.trace_pkt import process_packet
17 1
from napps.amlight.sdntrace.tracing.trace_entries import TraceEntries
18
19
20 1
class TraceManager(object):
21
    """
22
        The TraceManager class is the class responsible to
23
        manage all trace requests.
24
    """
25
26 1
    def __init__(self, controller):
27
        """Initialization of the TraceManager class
28
        Args:
29
             controller = Kytos.core.controller object
30
        """
31
        # Controller
32 1
        self.controller = controller
33
34
        # Trace ID used to distinguish each trace
35 1
        self._id = 30000
36
37
        # Trace queues
38 1
        self._request_queue = dict()
39 1
        self._results_queue = dict()
40 1
        self._running_traces = dict()
41
42
        # Counters
43 1
        self._total_traces_requested = 0
44
45
        # PacketIn queue with Probes
46 1
        self.trace_pkt_in = []
47
48 1
        self._is_tracing_running = True
49
50 1
        self.tasks: list[asyncio.Task] = []
51 1
        self._async_loop = None
52
        # To start traces
53 1
        self.run_traces(settings.TRACE_INTERVAL)        
54
55 1
    def stop_traces(self):
56 1
        self._is_tracing_running = False
57 1
        for task in self.tasks:
58 1
            task.cancel()
59
60 1
    def is_tracing_running(self):
61 1
        return self._is_tracing_running
62
    
63 1
    def run_traces(self, trace_interval):
64
        """
65
        Create the task to search for traces _run_traces.
66
        """
67
        self._async_loop = asyncio.get_running_loop()
68
        task = self._async_loop.create_task(
69
            self._run_traces(trace_interval)
70
        )
71
        self.tasks.append(task)
72
73 1
    async def _run_traces(self, trace_interval):
74
        """ Thread that will keep reading the self._request_queue
75
        queue looking for new trace requests to run.
76
77
        Args:
78
            trace_interval = sleeping time
79
        """
80 1
        while self.is_tracing_running():
81 1
            if self.number_pending_requests() > 0:
82 1
                try:
83 1
                    new_request_ids = []
84 1
                    for req_id in self._request_queue.copy():
85 1
                        if not self.limit_traces_reached():
86 1
                            entries = self._request_queue[req_id]
87 1
                            self._running_traces[req_id] = entries
88 1
                            task = self._async_loop.create_task(
89
                                self._spawn_trace(req_id, entries)
90
                            )
91 1
                            self.tasks.append(task)
92 1
                            new_request_ids.append(req_id)
93
                        else:
94
                            break
95
                    # After starting traces for new requests,
96
                    # remove them from self._request_queue
97 1
                    for rid in new_request_ids:
98 1
                        del self._request_queue[rid]
99
                except Exception as error:  # pylint: disable=broad-except
100
                    log.error("Trace Error: %s" % error)
101 1
            await asyncio.sleep(trace_interval)
102
103 1
    async def _spawn_trace(self, trace_id, trace_entries):
104
        """ Once a request is found by the run_traces method,
105
        instantiate a TracePath class and run the tracepath
106
107
        Args:
108
            trace_id: trace request id
109
            trace_entries: TraceEntries class
110
        """
111
        
112 1
        log.info("Creating task to trace request id %s..." % trace_id)
113 1
        tracer = TracePath(self, trace_id, trace_entries)
114 1
        await tracer.tracepath()
115
116 1
        del self._running_traces[trace_id]
117
118 1
    def add_result(self, trace_id, result):
119
        """Used to save trace results to self._results_queue
120
121
        Args:
122
            trace_id: trace ID
123
            result: trace result generated using tracer
124
        """
125 1
        self._results_queue[trace_id] = result
126
127 1
    def avoid_duplicated_request(self, entries):
128
        """Verify if any of the requested queries has the same entries.
129
        If so, ignore it
130
131
        Args:
132
            entries: entries provided by user via REST.
133
        Return:
134
            True: if exists a similar request
135
            False: otherwise
136
        """
137 1
        for request in self._request_queue.copy():
138 1
            if entries == self._request_queue[request]:
139 1
                return True
140 1
        return False
141
142 1
    @staticmethod
143 1
    def is_entry_valid(entries):
144
        """ This method validates all params provided, including
145
        if the switch/dpid requested exists.
146
147
        Args:
148
            entries: dictionary with user request
149
        Returns:
150
            TraceEntries class
151
            Error msg
152
        """
153 1
        try:
154 1
            trace_entries = TraceEntries()
155 1
            trace_entries.load_entries(entries)
156 1
        except ValueError as msg:
157 1
            return str(msg)
158
159 1
        init_switch = Switches().get_switch(trace_entries.dpid)
160 1
        if isinstance(init_switch, bool):
161 1
            return "Unknown Switch"
162 1
        color = Colors().get_switch_color(init_switch.dpid)
163
164 1
        if len(color) == 0:
165 1
            return "Switch not Colored"
166
167
        # TODO: get Coloring API to confirm color_field
168
169 1
        return trace_entries
170
171 1
    def get_id(self):
172
        """ID generator for each trace. Useful in case
173
        of parallel requests
174
175
        Returns:
176
            integer to be the new request/trace id
177
        """
178 1
        self._id += 1
179 1
        return self._id
180
181 1
    def get_result(self, trace_id):
182
        """Used by external apps to get a trace result using the trace ID
183
184
        Returns:
185
            result from self._results_queue
186
            msg depending of the status (unknown, pending, or active)
187
        """
188 1
        trace_id = int(trace_id)
189 1
        try:
190 1
            return self._results_queue[trace_id]
191 1
        except (ValueError, KeyError):
192 1
            if trace_id in self._running_traces:
193 1
                return {'msg': 'trace in process'}
194 1
            elif trace_id in self._request_queue:
195 1
                return {'msg': 'trace pending'}
196 1
            return {'msg': 'unknown trace id'}
197
198 1
    def get_results(self):
199
        """Used by external apps to get all trace results. Useful
200
        to see all requests and results
201
202
        Returns:
203
            list of results
204
        """
205
        return self._results_queue
206
207 1
    def limit_traces_reached(self):
208
        """ Control the number of active traces running in parallel. Protects the
209
        switches and avoid DoS.
210
211
        Returns:
212
            True: if the number of traces running is equal/more
213
                than settings.PARALLEL_TRACES
214
            False: if it is not.
215
        """
216 1
        if len(self._running_traces) >= settings.PARALLEL_TRACES:
217 1
            return True
218 1
        return False
219
220 1
    def new_trace(self, trace_entries):
221
        """Receives external requests for traces.
222
223
        Args:
224
            trace_entries: TraceEntries Class
225
        Returns:
226
            int with the request/trace id
227
        """
228
229 1
        trace_id = self.get_id()
230
231
        # Add to request_queue
232 1
        self._request_queue[trace_id] = trace_entries
233
234
        # Statistics
235 1
        self._total_traces_requested += 1
236
237 1
        return trace_id
238
239 1
    def number_pending_requests(self):
240
        """Used to check if there are entries to be traced
241
242
        Returns:
243
            length of self._request_queue
244
        """
245 1
        return len(self._request_queue)
246
247 1
    def queue_probe_packet(self, event, ethernet, in_port, switch):
248
        """Used by sdntrace.packet_in_handler. Only tracing probes
249
        get to this point. Adds the PacketIn msg received to the
250
        trace_pkt_in queue.
251
252
        Args:
253
            event: PacketIn msg
254
            ethernet: ethernet frame
255
            in_port: in_port
256
            switch: kytos.core.switch.Switch() class
257
        """
258
        pkt_in = dict()
259
260
        pkt_in["dpid"] = switch.dpid
261
        pkt_in["in_port"] = in_port
262
        pkt_in["msg"] = dill.loads(process_packet(ethernet))
263
        pkt_in["ethernet"] = ethernet
264
        pkt_in["event"] = event
265
266
        # This queue stores all PacketIn message received
267
        self.trace_pkt_in.append(pkt_in)
268
269
    # REST calls
270
271 1
    def rest_new_trace(self, entries):
272
        """Used for the REST PUT call
273
274
        Args:
275
            entries: user provided parameters to trace
276
        Returns:
277
            Trace_ID in JSON format
278
            Error msg if entries has invalid data
279
        """
280 1
        result = dict()
281 1
        trace_entries = self.is_entry_valid(entries)
282 1
        if not isinstance(trace_entries, TraceEntries):
283 1
            result['result'] = {'error': trace_entries}
284 1
            return result
285
286 1
        if self.avoid_duplicated_request(entries):
287 1
            result['result'] = {'error': "Duplicated Trace Request ignored"}
288 1
            return result
289
290 1
        trace_id = self.new_trace(trace_entries)
291 1
        result['result'] = {'trace_id': trace_id}
292 1
        return result
293
294 1
    def rest_get_result(self, trace_id):
295
        """Used for the REST GET call
296
297
        Returns:
298
            get_result in JSON format
299
        """
300 1
        return self.get_result(trace_id)
301
302 1
    def rest_list_results(self):
303
        """Used for the REST GET call
304
305
        Returns:
306
            get_results in JSON format
307
        """
308 1
        return self.get_results()
309
310 1
    def rest_list_stats(self):
311
        """ Used to export some info about the TraceManager.
312
        Total number of requests, number of active traces, number of
313
        pending traces, list of traces pending
314
        Returns:
315
                Total number of requests
316
                number of active traces
317
                number of pending traces
318
                list of traces pending
319
        """
320 1
        stats = dict()
321 1
        stats['number_of_requests'] = self._total_traces_requested
322 1
        stats['number_of_running_traces'] = len(self._running_traces)
323 1
        stats['number_of_pending_traces'] = len(self._request_queue)
324 1
        stats['list_of_pending_traces'] = self._results_queue
325
326
        return stats
327