Passed
Pull Request — master (#42)
by Jose
03:04
created

build.stats   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 411
Duplicated Lines 0 %

Test Coverage

Coverage 56.11%

Importance

Changes 0
Metric Value
wmc 58
eloc 229
dl 0
loc 411
ccs 101
cts 180
cp 0.5611
rs 4.5599
c 0
b 0
f 0

23 Methods

Rating   Name   Duplication   Size   Complexity  
A Stats.listen() 0 3 1
A Stats.__init__() 0 9 1
A Stats.request() 0 3 1
A Stats._send_event() 0 5 1
A FlowStats.listen() 0 30 3
A RRD.create_rrd() 0 22 3
B RRD.fetch() 0 48 7
A PortStats.request() 0 5 1
A PortStats.listen() 0 30 2
A FlowStats.request() 0 5 1
A PortStats._get_versioned_request() 0 9 2
A Stats._save_event_callback() 0 6 2
B RRD.fetch_latest() 0 22 6
A RRD.update() 0 18 3
A RRD.get_rrd() 0 22 2
A PortStats._update_controller_interface() 0 8 3
A RRD._get_archives() 0 11 2
A AggregateStats.listen() 0 16 2
A RRD.__init__() 0 11 1
A RRD.get_or_create_rrd() 0 21 4
B RRD._calc_start_end() 0 19 7
A FlowStats._get_versioned_request() 0 9 2
A AggregateStats.request() 0 7 1

How to fix   Complexity   

Complexity

Complex classes like build.stats often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4 1
from pathlib import Path
5
6 1
import pyof.v0x01.controller2switch.common as v0x01
7 1
import rrdtool
8
# pylint: disable=C0411,C0412
9 1
from pyof.v0x01.common.phy_port import Port
10 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
13 1
from pyof.v0x04.controller2switch.common import MultipartType
14 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16 1
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18 1
from napps.kytos.of_core.flow import FlowFactory
19 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20 1
from napps.kytos.of_stats import settings
21
22
23 1
class Stats(metaclass=ABCMeta):
24
    """Abstract class for Statistics implementation."""
25
26 1
    rrd = None
27
28 1
    def __init__(self, msg_out_buffer, msg_app_buffer):
29
        """Store a reference to the controller's msg_out buffer.
30
31
        Args:
32
            msg_out_buffer: Where to send events.
33
34
        """
35
        self._buffer = msg_out_buffer
36
        self._app_buffer = msg_app_buffer
37
38 1
    @abstractmethod
39
    def request(self, conn):
40
        """Request statistics."""
41
42 1
    @abstractmethod
43
    def listen(self, switch, stats):
44
        """Listen statistic replies."""
45
46 1
    def _send_event(self, req, conn):
47
        event = KytosEvent(
48
            name='kytos/of_stats.messages.out.ofpt_stats_request',
49
            content={'message': req, 'destination': conn})
50
        self._buffer.put(event)
51
52 1
    @classmethod
53
    def _save_event_callback(cls, _event, data, error):
54
        """Execute the callback to handle with kronos event to save data."""
55
        if error:
56
            log.error(f'Can\'t save stats in kytos/kronos: {error}')
57
        log.debug(data)
58
59
60 1
class RRD:
61
    """Round-robin database for keeping stats.
62
63
    It store statistics every :data:`STATS_INTERVAL`.
64
    """
65
66 1
    def __init__(self, app_folder, data_sources):
67
        """Specify a folder to store RRDs.
68
69
        Args:
70
            app_folder (str): Parent folder for dpids folders.
71
            data_sources (iterable): Data source names (e.g. tx_bytes,
72
                rx_bytes).
73
74
        """
75 1
        self._app = app_folder
76 1
        self._ds = data_sources
77
78 1
    def update(self, index, tstamp=None, **ds_values):
79
        """Add a row to rrd file of *dpid* and *_id*.
80
81
        Args:
82
            dpid (str): Switch dpid.
83
            index (list of str): Index for the RRD database. Examples:
84
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
85
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
86
87
        Create rrd if necessary.
88
89
        """
90 1
        if tstamp is None:
91
            tstamp = 'N'
92 1
        rrd = self.get_or_create_rrd(index)
93 1
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
94 1
        with settings.RRD_LOCK:
95 1
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
96
97 1
    def get_rrd(self, index):
98
        """Return path of the RRD file for *dpid* with *basename*.
99
100
        If rrd doesn't exist, it is *not* created.
101
102
        Args:
103
            index (iterable of str): Index for the RRD database. Examples:
104
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
105
106
        Returns:
107
            str: Absolute RRD path.
108
109
        See Also:
110
            :meth:`get_or_create_rrd`
111
112
        """
113 1
        path = settings.DIR / self._app
114 1
        folders, basename = index[:-1], index[-1]
115 1
        for folder in folders:
116 1
            path = path / folder
117 1
        path = path / '{}.rrd'.format(basename)
118 1
        return str(path)
119
120 1
    def get_or_create_rrd(self, index, tstamp=None):
121
        """If rrd is not found, create it.
122
123
        Args:
124
            index (list of str): Index for the RRD database. Examples:
125
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
126
            tstamp (str, int): Value for start argument of RRD creation.
127
128
        """
129 1
        if tstamp is None:
130 1
            tstamp = 'N'
131
132 1
        rrd = self.get_rrd(index)
133 1
        if not Path(rrd).exists():
134
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
135
            parent = Path(rrd).parent
136
            if not parent.exists():
137
                # We may have concurrency problems creating a folder
138
                parent.mkdir(parents=True, exist_ok=True)
139
            self.create_rrd(rrd, tstamp)
140 1
        return rrd
141
142 1
    def create_rrd(self, rrd, tstamp=None):
143
        """Create an RRD file.
144
145
        Args:
146
            rrd (str): Path of RRD file to be created.
147
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
148
                Defaults to now.
149
150
        """
151 1
        def get_counter(ds_value):
152
            """Return a DS for rrd creation."""
153 1
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
154
                                                   settings.MIN, settings.MAX)
155
156 1
        if tstamp is None:
157
            tstamp = 'N'
158 1
        options = [rrd, '--start', str(tstamp), '--step',
159
                   str(settings.STATS_INTERVAL)]
160 1
        options.extend([get_counter(ds) for ds in self._ds])
161 1
        options.extend(self._get_archives())
162 1
        with settings.RRD_LOCK:
163 1
            rrdtool.create(*options)
164
165
    # pylint: disable=R0914
166 1
    def fetch(self, index, start=None, end=None, n_points=None):
167
        """Fetch average values from rrd.
168
169
        Args:
170
            index (list of str): Index for the RRD database. Examples:
171
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
172
            start (str, int): Unix timestamp in seconds for the first stats.
173
                Defaults to be old enough to have the latest n_points
174
                available (now - n_points * settings.STATS_INTERVAL).
175
            end (str, int): Unix timestamp in seconds for the last stats.
176
                Defaults to current time.
177
            n_points (int): Number of points to return. May return more if
178
                there is no matching resolution in the RRD file, or less if
179
                there is no records for all the time range.
180
                Defaults to as many points as possible.
181
182
        Returns:
183
            A tuple with:
184
185
            1. Iterator over timestamps
186
            2. Column (DS) names
187
            3. List of rows as tuples
188
189
        """
190 1
        rrd = self.get_rrd(index)
191 1
        if not Path(rrd).exists():
192 1
            msg = 'RRD for app {} and index {} not found'.format(self._app,
193
                                                                 index)
194 1
            raise FileNotFoundError(msg)
195
196
        # Use integers to calculate resolution
197 1
        start, end = self._calc_start_end(start, end, n_points, rrd)
198
199
        # Find the best matching resolution for returning n_points.
200 1
        res_args = []
201 1
        if n_points is not None and isinstance(start, int) \
202
                and isinstance(end, int):
203
            resolution = (end - start) // n_points
204
            if resolution > 0:
205
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
206
207 1
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
208 1
        args.extend(res_args)
209 1
        with settings.RRD_LOCK:
210 1
            tstamps, cols, rows = rrdtool.fetch(*args)
211 1
        start, stop, step = tstamps
212
        # rrdtool range is different from Python's.
213 1
        return range(start + step, stop + 1, step), cols, rows
214
215 1
    @staticmethod
216
    def _calc_start_end(start, end, n_points, rrd):
217
        """Calculate start and end values for fetch command."""
218
        # Use integers to calculate resolution
219 1
        if end is None:
220
            end = int(time.time())
221 1
        if start is None:  # Latest n_points
222
            start = end - n_points * settings.STATS_INTERVAL
223 1
        elif start == 'first':  # Usually empty because 'first' is too old
224
            with settings.RRD_LOCK:
225
                start = rrdtool.first(rrd)
226
227
        # For RRDtool to include start and end timestamps.
228 1
        if isinstance(start, int):
229 1
            start -= 1
230 1
        if isinstance(end, int):
231 1
            end -= 1
232
233 1
        return start, end
234
235 1
    def fetch_latest(self, index):
236
        """Fetch only the value for now.
237
238
        Return zero values if there are no values recorded.
239
        """
240 1
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
241 1
        try:
242 1
            tstamps, cols, rows = self.fetch(index, start, end='now')
243 1
        except FileNotFoundError:
244
            # No RRD for port, so it will return zero values
245 1
            return {}
246
        # Last rows may have future timestamp and be empty
247
        latest = None
248
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
249
        # Search backwards for non-null values
250
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
251
            if row[0] is not None and tstamp > min_tstamp:
252
                latest = row
253
        # If no values are found, add zeros.
254
        if not latest:
255
            latest = [0] * len(cols)
256
        return dict(zip(cols, latest))
257
258 1
    @classmethod
259
    def _get_archives(cls):
260
        """Averaged for all Data Sources."""
261 1
        averages = []
262
        # One month stats for the following periods:
263 1
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
264
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
265 1
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
266
                                                          settings.PERIOD))
267
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
268 1
        return averages
269
270
271 1
class PortStats(Stats):
272
    """Deal with PortStats messages."""
273
274 1
    def request(self, conn):
275
        """Ask for port stats."""
276
        request = self._get_versioned_request(conn.protocol.version)
277
        self._send_event(request, conn)
278
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
279
280 1
    @staticmethod
281
    def _get_versioned_request(of_version):
282
        if of_version == 0x01:
283
            return StatsRequest(
284
                body_type=StatsType.OFPST_PORT,
285
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
286
        return MultipartRequest(
287
            multipart_type=MultipartType.OFPMP_PORT_STATS,
288
            body=v0x04.PortStatsRequest())
289
290 1
    def listen(self, switch, ports_stats):
291
        """Receive port stats."""
292
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
293
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
294
                    ' rx_errors %s, tx_errors %s'
295
296
        for port_stat in ports_stats:
297
            self._update_controller_interface(switch, port_stat)
298
299
            statistics_to_send = {'rx_bytes': port_stat.rx_bytes.value,
300
                                  'tx_bytes': port_stat.tx_bytes.value,
301
                                  'rx_dropped': port_stat.rx_dropped.value,
302
                                  'tx_dropped': port_stat.tx_dropped.value,
303
                                  'rx_errors': port_stat.rx_errors.value,
304
                                  'tx_errors': port_stat.tx_errors.value}
305
306
            port_no = port_stat.port_no.value
307
308
            namespace = f'kytos.kronos.{switch.id}.port_no.{port_no}'
309
            content = {'namespace': namespace,
310
                       'value': statistics_to_send,
311
                       'callback': self._save_event_callback,
312
                       'timestamp': time.time()}
313
            event = KytosEvent(name='kytos.kronos.save', content=content)
314
            self._app_buffer.put(event)
315
316
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
317
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
318
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
319
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
320
321 1
    @staticmethod
322
    def _update_controller_interface(switch, port_stats):
323
        port_no = port_stats.port_no.value
324
        iface = switch.get_interface_by_port_no(port_no)
325
        if iface is not None:
326
            if iface.stats is None:
327
                iface.stats = OFCorePortStats()
328
            iface.stats.update(port_stats)
329
330
331 1
class AggregateStats(Stats):
332
    """Deal with AggregateStats message."""
333
334 1
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
335
336 1
    def request(self, conn):
337
        """Ask for flow stats."""
338
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
339
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
340
        self._send_event(req, conn)
341
        log.debug('Aggregate Stats request for switch %s sent.',
342
                  conn.switch.dpid)
343
344 1
    @classmethod
345
    def listen(cls, switch, aggregate_stats):
346
        """Receive flow stats."""
347
        debug_msg = 'Received aggregate stats from switch {}:' \
348
                    ' packet_count {}, byte_count {}, flow_count {}'
349
350
        for aggregate in aggregate_stats:
351
            # need to choose the _id to aggregate_stats
352
            # this class isn't used yet.
353
            cls.rrd.update((switch.id,),
354
                           packet_count=aggregate.packet_count.value,
355
                           byte_count=aggregate.byte_count.value,
356
                           flow_count=aggregate.flow_count.value)
357
358
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
359
                      aggregate.byte_count.value, aggregate.flow_count.value)
360
361
362 1
class FlowStats(Stats):
363
    """Deal with FlowStats message."""
364
365 1
    def request(self, conn):
366
        """Ask for flow stats."""
367
        request = self._get_versioned_request(conn.protocol.version)
368
        self._send_event(request, conn)
369
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
370
371 1
    @staticmethod
372
    def _get_versioned_request(of_version):
373
        if of_version == 0x01:
374
            return StatsRequest(
375
                body_type=StatsType.OFPST_FLOW,
376
                body=v0x01.FlowStatsRequest())
377
        return MultipartRequest(
378
            multipart_type=MultipartType.OFPMP_FLOW,
379
            body=v0x04.FlowStatsRequest())
380
381 1
    def listen(self, switch, flows_stats):
382
        """Receive flow stats."""
383
        flow_class = FlowFactory.get_class(switch)
384
        for flow_stat in flows_stats:
385
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
386
387
            # Update controller's flow
388
            controller_flow = switch.get_flow_by_id(flow.id)
389
            if controller_flow:
390
                controller_flow.stats = flow.stats
391
392
            # Save packet_count using kytos/kronos
393
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
394
            content = {'namespace': namespace,
395
                       'value': {'packet_count': flow.stats.packet_count},
396
                       'callback': self._save_event_callback,
397
                       'timestamp': time.time()}
398
399
            event = KytosEvent(name='kytos.kronos.save', content=content)
400
            self._app_buffer.put(event)
401
402
            # Save byte_count using kytos/kronos
403
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
404
            content = {'namespace': namespace,
405
                       'value': {'byte_count': flow.stats.byte_count},
406
                       'callback': self._save_event_callback,
407
                       'timestamp': time.time()}
408
409
            event = KytosEvent(name='kytos.kronos.save', content=content)
410
            self._app_buffer.put(event)
411