Test Failed
Pull Request — master (#36)
by Jose
01:45
created

build.stats.RRD._calc_start_end()   B

Complexity

Conditions 7

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 14
nop 4
dl 0
loc 19
rs 8
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2
import time
3
from abc import ABCMeta, abstractmethod
4
from pathlib import Path
5
6
import pyof.v0x01.controller2switch.common as v0x01
7
import rrdtool
8
from kytos.core import KytosEvent, log
9
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
10
# v0x01 and v0x04 PortStats are version independent
11
from napps.kytos.of_core.flow import FlowFactory
12
# Disable warning about ungrouped pyof imports due to isort
13
from pyof.v0x01.common.phy_port import Port  # pylint: disable=C0412
14
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
15
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
16
from pyof.v0x04.controller2switch import multipart_request as v0x04
17
from pyof.v0x04.controller2switch.common import MultipartType
18
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
19
20
from . import settings
21
22
23
class Stats(metaclass=ABCMeta):
24
    """Abstract class for Statistics implementation."""
25
26
    rrd = None
27
28
    def __init__(self, msg_out_buffer):
29
        """Store a reference to the controller's msg_out buffer.
30
31
        Args:
32
            msg_out_buffer: Where to send events.
33
        """
34
        self._buffer = msg_out_buffer
35
36
    @abstractmethod
37
    def request(self, conn):
38
        """Request statistics."""
39
        pass
40
41
    @abstractmethod
42
    def listen(self, switch, stats):
43
        """Listen statistic replies."""
44
        pass
45
46
    def _send_event(self, req, conn):
47
        event = KytosEvent(
48
            name='kytos/of_stats.messages.out.ofpt_stats_request',
49
            content={'message': req, 'destination': conn})
50
        self._buffer.put(event)
51
52
53
class RRD:
54
    """Round-robin database for keeping stats.
55
56
    It store statistics every :data:`STATS_INTERVAL`.
57
    """
58
59
    def __init__(self, app_folder, data_sources):
60
        """Specify a folder to store RRDs.
61
62
        Args:
63
            app_folder (str): Parent folder for dpids folders.
64
            data_sources (iterable): Data source names (e.g. tx_bytes,
65
                rx_bytes).
66
        """
67
        self._app = app_folder
68
        self._ds = data_sources
69
70
    def update(self, index, tstamp=None, **ds_values):
71
        """Add a row to rrd file of *dpid* and *_id*.
72
73
        Args:
74
            dpid (str): Switch dpid.
75
            index (list of str): Index for the RRD database. Examples:
76
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
77
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
78
79
        Create rrd if necessary.
80
        """
81
        if tstamp is None:
82
            tstamp = 'N'
83
        rrd = self.get_or_create_rrd(index)
84
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
85
        with settings.rrd_lock:
86
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
87
88
    def get_rrd(self, index):
89
        """Return path of the RRD file for *dpid* with *basename*.
90
91
        If rrd doesn't exist, it is *not* created.
92
93
        Args:
94
            index (iterable of str): Index for the RRD database. Examples:
95
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
96
97
        Returns:
98
            str: Absolute RRD path.
99
100
        See Also:
101
            :meth:`get_or_create_rrd`
102
103
        """
104
        path = settings.DIR / self._app
105
        folders, basename = index[:-1], index[-1]
106
        for folder in folders:
107
            path = path / folder
108
        path = path / '{}.rrd'.format(basename)
109
        return str(path)
110
111
    def get_or_create_rrd(self, index, tstamp=None):
112
        """If rrd is not found, create it.
113
114
        Args:
115
            index (list of str): Index for the RRD database. Examples:
116
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
117
            tstamp (str, int): Value for start argument of RRD creation.
118
        """
119
        if tstamp is None:
120
            tstamp = 'N'
121
122
        rrd = self.get_rrd(index)
123
        if not Path(rrd).exists():
124
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
125
            parent = Path(rrd).parent
126
            if not parent.exists():
127
                # We may have concurrency problems creating a folder
128
                parent.mkdir(parents=True, exist_ok=True)
129
            self.create_rrd(rrd, tstamp)
130
        return rrd
131
132
    def create_rrd(self, rrd, tstamp=None):
133
        """Create an RRD file.
134
135
        Args:
136
            rrd (str): Path of RRD file to be created.
137
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
138
                Defaults to now.
139
        """
140
        def get_counter(ds):
141
            """Return a DS for rrd creation."""
142
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds, settings.TIMEOUT,
143
                                                   settings.MIN, settings.MAX)
144
145
        if tstamp is None:
146
            tstamp = 'N'
147
        options = [rrd, '--start', str(tstamp), '--step',
148
                   str(settings.STATS_INTERVAL)]
149
        options.extend([get_counter(ds) for ds in self._ds])
150
        options.extend(self._get_archives())
151
        with settings.rrd_lock:
152
            rrdtool.create(*options)
153
154
    def fetch(self, index, start=None, end=None, n_points=None):
155
        """Fetch average values from rrd.
156
157
        Args:
158
            index (list of str): Index for the RRD database. Examples:
159
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
160
            start (str, int): Unix timestamp in seconds for the first stats.
161
                Defaults to be old enough to have the latest n_points
162
                available (now - n_points * settings.STATS_INTERVAL).
163
            end (str, int): Unix timestamp in seconds for the last stats.
164
                Defaults to current time.
165
            n_points (int): Number of points to return. May return more if
166
                there is no matching resolution in the RRD file, or less if
167
                there is no records for all the time range.
168
                Defaults to as many points as possible.
169
170
        Returns:
171
            A tuple with:
172
173
            1. Iterator over timestamps
174
            2. Column (DS) names
175
            3. List of rows as tuples
176
177
        """
178
        rrd = self.get_rrd(index)
179
        if not Path(rrd).exists():
180
            msg = 'RRD for app {} and index {} not found'.format(self._app,
181
                                                                 index)
182
            raise FileNotFoundError(msg)
183
184
        # Use integers to calculate resolution
185
        start, end = self._calc_start_end(start, end, n_points, rrd)
186
187
        # Find the best matching resolution for returning n_points.
188
        res_args = []
189
        if n_points is not None and isinstance(start, int) \
190
                and isinstance(end, int):
191
            resolution = (end - start) // n_points
192
            if resolution > 0:
193
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
194
195
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
196
        args.extend(res_args)
197
        with settings.rrd_lock:
198
            tstamps, cols, rows = rrdtool.fetch(*args)
199
        start, stop, step = tstamps
200
        # rrdtool range is different from Python's.
201
        return range(start + step, stop + 1, step), cols, rows
202
203
    @staticmethod
204
    def _calc_start_end(start, end, n_points, rrd):
205
        """Calculate start and end values for fetch command."""
206
        # Use integers to calculate resolution
207
        if end is None:
208
            end = int(time.time())
209
        if start is None:  # Latest n_points
210
            start = end - n_points * settings.STATS_INTERVAL
211
        elif start == 'first':  # Usually empty because 'first' is too old
212
            with settings.rrd_lock:
213
                start = rrdtool.first(rrd)
214
215
        # For RRDtool to include start and end timestamps.
216
        if isinstance(start, int):
217
            start -= 1
218
        if isinstance(end, int):
219
            end -= 1
220
221
        return start, end
222
223
    def fetch_latest(self, index):
224
        """Fetch only the value for now.
225
226
        Return zero values if there are no values recorded.
227
        """
228
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
229
        try:
230
            tstamps, cols, rows = self.fetch(index, start, end='now')
231
        except FileNotFoundError:
232
            # No RRD for port, so it will return zero values
233
            return {}
234
        # Last rows may have future timestamp and be empty
235
        latest = None
236
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
237
        # Search backwards for non-null values
238
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
239
            if row[0] is not None and tstamp > min_tstamp:
240
                latest = row
241
        # If no values are found, add zeros.
242
        if not latest:
243
            latest = [0] * len(cols)
244
        return {k: v for k, v in zip(cols, latest)}
245
246
    @classmethod
247
    def _get_archives(cls):
248
        """Averaged for all Data Sources."""
249
        averages = []
250
        # One month stats for the following periods:
251
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
252
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
253
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
254
                                                          settings.PERIOD))
255
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
256
        return averages
257
258
259
class PortStats(Stats):
260
    """Deal with PortStats messages."""
261
262
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
263
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
264
265
    def request(self, conn):
266
        """Ask for port stats."""
267
        request = self._get_versioned_request(conn.protocol.version)
268
        self._send_event(request, conn)
269
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
270
271
    @staticmethod
272
    def _get_versioned_request(of_version):
273
        if of_version == 0x01:
274
            return StatsRequest(
275
                body_type=StatsType.OFPST_PORT,
276
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
277
        return MultipartRequest(
278
            multipart_type=MultipartType.OFPMP_PORT_STATS,
279
            body=v0x04.PortStatsRequest())
280
281
    @classmethod
282
    def listen(cls, switch, ports_stats):
283
        """Receive port stats."""
284
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
285
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
286
                    ' rx_errors %s, tx_errors %s'
287
288
        for ps in ports_stats:
289
            cls._update_controller_interface(switch, ps)
290
            cls.rrd.update((switch.id, ps.port_no.value),
291
                           rx_bytes=ps.rx_bytes.value,
292
                           tx_bytes=ps.tx_bytes.value,
293
                           rx_dropped=ps.rx_dropped.value,
294
                           tx_dropped=ps.tx_dropped.value,
295
                           rx_errors=ps.rx_errors.value,
296
                           tx_errors=ps.tx_errors.value)
297
298
            log.debug(debug_msg, ps.port_no.value, switch.id,
299
                      ps.rx_bytes.value, ps.tx_bytes.value,
300
                      ps.rx_dropped.value, ps.tx_dropped.value,
301
                      ps.rx_errors.value, ps.tx_errors.value)
302
303
    @staticmethod
304
    def _update_controller_interface(switch, port_stats):
305
        port_no = port_stats.port_no.value
306
        iface = switch.get_interface_by_port_no(port_no)
307
        if iface is not None:
308
            if iface.stats is None:
309
                iface.stats = OFCorePortStats()
310
            iface.stats.update(port_stats)
311
312
313
class AggregateStats(Stats):
314
    """Deal with AggregateStats message."""
315
316
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
317
318
    def request(self, conn):
319
        """Ask for flow stats."""
320
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
321
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
322
        self._send_event(req, conn)
323
        log.debug('Aggregate Stats request for switch %s sent.',
324
                  conn.switch.dpid)
325
326
    @classmethod
327
    def listen(cls, switch, aggregate_stats):
328
        """Receive flow stats."""
329
        debug_msg = 'Received aggregate stats from switch {}:' \
330
                    ' packet_count {}, byte_count {}, flow_count {}'
331
332
        for ag in aggregate_stats:
333
            # need to choose the _id to aggregate_stats
334
            # this class isn't used yet.
335
            cls.rrd.update((switch.id,),
336
                           packet_count=ag.packet_count.value,
337
                           byte_count=ag.byte_count.value,
338
                           flow_count=ag.flow_count.value)
339
340
            log.debug(debug_msg, switch.id, ag.packet_count.value,
341
                      ag.byte_count.value, ag.flow_count.value)
342
343
344
class FlowStats(Stats):
345
    """Deal with FlowStats message."""
346
347
    rrd = RRD('flows', ('packet_count', 'byte_count'))
348
349
    def request(self, conn):
350
        """Ask for flow stats."""
351
        request = self._get_versioned_request(conn.protocol.version)
352
        self._send_event(request, conn)
353
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
354
355
    @staticmethod
356
    def _get_versioned_request(of_version):
357
        if of_version == 0x01:
358
            return StatsRequest(
359
                body_type=StatsType.OFPST_FLOW,
360
                body=v0x01.FlowStatsRequest())
361
        return MultipartRequest(
362
            multipart_type=MultipartType.OFPMP_FLOW,
363
            body=v0x04.FlowStatsRequest())
364
365
    @classmethod
366
    def listen(cls, switch, flows_stats):
367
        """Receive flow stats."""
368
        flow_class = FlowFactory.get_class(switch)
369
        for fs in flows_stats:
370
            flow = flow_class.from_of_flow_stats(fs, switch)
371
372
            # Update controller's flow
373
            controller_flow = switch.get_flow_by_id(flow.id)
374
            if controller_flow:
375
                controller_flow.stats = flow.stats
376
377
            # Update RRD database
378
            cls.rrd.update((switch.id, flow.id),
379
                           packet_count=flow.stats.packet_count,
380
                           byte_count=flow.stats.byte_count)
381