Passed
Push — master ( ef307d...204120 )
by Humberto
14:51 queued 12s
created

build.stats.FlowStats.request()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2
import time
3
from abc import ABCMeta, abstractmethod
4
from pathlib import Path
5
6
import pyof.v0x01.controller2switch.common as v0x01
7
import rrdtool
8
# pylint: disable=C0411,C0412
9
from pyof.v0x01.common.phy_port import Port
10
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12
from pyof.v0x04.controller2switch import multipart_request as v0x04
13
from pyof.v0x04.controller2switch.common import MultipartType
14
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18
from napps.kytos.of_core.flow import FlowFactory
19
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20
21
# Disable warning about ungrouped pyof imports due to isort
22
from . import settings
23
24
25
class Stats(metaclass=ABCMeta):
26
    """Abstract class for Statistics implementation."""
27
28
    rrd = None
29
30
    def __init__(self, msg_out_buffer):
31
        """Store a reference to the controller's msg_out buffer.
32
33
        Args:
34
            msg_out_buffer: Where to send events.
35
36
        """
37
        self._buffer = msg_out_buffer
38
39
    @abstractmethod
40
    def request(self, conn):
41
        """Request statistics."""
42
43
    @abstractmethod
44
    def listen(self, switch, stats):
45
        """Listen statistic replies."""
46
47
    def _send_event(self, req, conn):
48
        event = KytosEvent(
49
            name='kytos/of_stats.messages.out.ofpt_stats_request',
50
            content={'message': req, 'destination': conn})
51
        self._buffer.put(event)
52
53
54
class RRD:
55
    """Round-robin database for keeping stats.
56
57
    It store statistics every :data:`STATS_INTERVAL`.
58
    """
59
60
    def __init__(self, app_folder, data_sources):
61
        """Specify a folder to store RRDs.
62
63
        Args:
64
            app_folder (str): Parent folder for dpids folders.
65
            data_sources (iterable): Data source names (e.g. tx_bytes,
66
                rx_bytes).
67
68
        """
69
        self._app = app_folder
70
        self._ds = data_sources
71
72
    def update(self, index, tstamp=None, **ds_values):
73
        """Add a row to rrd file of *dpid* and *_id*.
74
75
        Args:
76
            dpid (str): Switch dpid.
77
            index (list of str): Index for the RRD database. Examples:
78
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
79
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
80
81
        Create rrd if necessary.
82
83
        """
84
        if tstamp is None:
85
            tstamp = 'N'
86
        rrd = self.get_or_create_rrd(index)
87
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
88
        with settings.RRD_LOCK:
89
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
90
91
    def get_rrd(self, index):
92
        """Return path of the RRD file for *dpid* with *basename*.
93
94
        If rrd doesn't exist, it is *not* created.
95
96
        Args:
97
            index (iterable of str): Index for the RRD database. Examples:
98
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
99
100
        Returns:
101
            str: Absolute RRD path.
102
103
        See Also:
104
            :meth:`get_or_create_rrd`
105
106
        """
107
        path = settings.DIR / self._app
108
        folders, basename = index[:-1], index[-1]
109
        for folder in folders:
110
            path = path / folder
111
        path = path / '{}.rrd'.format(basename)
112
        return str(path)
113
114
    def get_or_create_rrd(self, index, tstamp=None):
115
        """If rrd is not found, create it.
116
117
        Args:
118
            index (list of str): Index for the RRD database. Examples:
119
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
120
            tstamp (str, int): Value for start argument of RRD creation.
121
122
        """
123
        if tstamp is None:
124
            tstamp = 'N'
125
126
        rrd = self.get_rrd(index)
127
        if not Path(rrd).exists():
128
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
129
            parent = Path(rrd).parent
130
            if not parent.exists():
131
                # We may have concurrency problems creating a folder
132
                parent.mkdir(parents=True, exist_ok=True)
133
            self.create_rrd(rrd, tstamp)
134
        return rrd
135
136
    def create_rrd(self, rrd, tstamp=None):
137
        """Create an RRD file.
138
139
        Args:
140
            rrd (str): Path of RRD file to be created.
141
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
142
                Defaults to now.
143
144
        """
145
        def get_counter(ds_value):
146
            """Return a DS for rrd creation."""
147
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
148
                                                   settings.MIN, settings.MAX)
149
150
        if tstamp is None:
151
            tstamp = 'N'
152
        options = [rrd, '--start', str(tstamp), '--step',
153
                   str(settings.STATS_INTERVAL)]
154
        options.extend([get_counter(ds) for ds in self._ds])
155
        options.extend(self._get_archives())
156
        with settings.RRD_LOCK:
157
            rrdtool.create(*options)
158
159
    # pylint: disable=R0914
160
    def fetch(self, index, start=None, end=None, n_points=None):
161
        """Fetch average values from rrd.
162
163
        Args:
164
            index (list of str): Index for the RRD database. Examples:
165
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
166
            start (str, int): Unix timestamp in seconds for the first stats.
167
                Defaults to be old enough to have the latest n_points
168
                available (now - n_points * settings.STATS_INTERVAL).
169
            end (str, int): Unix timestamp in seconds for the last stats.
170
                Defaults to current time.
171
            n_points (int): Number of points to return. May return more if
172
                there is no matching resolution in the RRD file, or less if
173
                there is no records for all the time range.
174
                Defaults to as many points as possible.
175
176
        Returns:
177
            A tuple with:
178
179
            1. Iterator over timestamps
180
            2. Column (DS) names
181
            3. List of rows as tuples
182
183
        """
184
        rrd = self.get_rrd(index)
185
        if not Path(rrd).exists():
186
            msg = 'RRD for app {} and index {} not found'.format(self._app,
187
                                                                 index)
188
            raise FileNotFoundError(msg)
189
190
        # Use integers to calculate resolution
191
        start, end = self._calc_start_end(start, end, n_points, rrd)
192
193
        # Find the best matching resolution for returning n_points.
194
        res_args = []
195
        if n_points is not None and isinstance(start, int) \
196
                and isinstance(end, int):
197
            resolution = (end - start) // n_points
198
            if resolution > 0:
199
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
200
201
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
202
        args.extend(res_args)
203
        with settings.RRD_LOCK:
204
            tstamps, cols, rows = rrdtool.fetch(*args)
205
        start, stop, step = tstamps
206
        # rrdtool range is different from Python's.
207
        return range(start + step, stop + 1, step), cols, rows
208
209
    @staticmethod
210
    def _calc_start_end(start, end, n_points, rrd):
211
        """Calculate start and end values for fetch command."""
212
        # Use integers to calculate resolution
213
        if end is None:
214
            end = int(time.time())
215
        if start is None:  # Latest n_points
216
            start = end - n_points * settings.STATS_INTERVAL
217
        elif start == 'first':  # Usually empty because 'first' is too old
218
            with settings.RRD_LOCK:
219
                start = rrdtool.first(rrd)
220
221
        # For RRDtool to include start and end timestamps.
222
        if isinstance(start, int):
223
            start -= 1
224
        if isinstance(end, int):
225
            end -= 1
226
227
        return start, end
228
229
    def fetch_latest(self, index):
230
        """Fetch only the value for now.
231
232
        Return zero values if there are no values recorded.
233
        """
234
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
235
        try:
236
            tstamps, cols, rows = self.fetch(index, start, end='now')
237
        except FileNotFoundError:
238
            # No RRD for port, so it will return zero values
239
            return {}
240
        # Last rows may have future timestamp and be empty
241
        latest = None
242
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
243
        # Search backwards for non-null values
244
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
245
            if row[0] is not None and tstamp > min_tstamp:
246
                latest = row
247
        # If no values are found, add zeros.
248
        if not latest:
249
            latest = [0] * len(cols)
250
        return dict(zip(cols, latest))
251
252
    @classmethod
253
    def _get_archives(cls):
254
        """Averaged for all Data Sources."""
255
        averages = []
256
        # One month stats for the following periods:
257
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
258
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
259
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
260
                                                          settings.PERIOD))
261
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
262
        return averages
263
264
265
class PortStats(Stats):
266
    """Deal with PortStats messages."""
267
268
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
269
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
270
271
    def request(self, conn):
272
        """Ask for port stats."""
273
        request = self._get_versioned_request(conn.protocol.version)
274
        self._send_event(request, conn)
275
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
276
277
    @staticmethod
278
    def _get_versioned_request(of_version):
279
        if of_version == 0x01:
280
            return StatsRequest(
281
                body_type=StatsType.OFPST_PORT,
282
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
283
        return MultipartRequest(
284
            multipart_type=MultipartType.OFPMP_PORT_STATS,
285
            body=v0x04.PortStatsRequest())
286
287
    @classmethod
288
    def listen(cls, switch, ports_stats):
289
        """Receive port stats."""
290
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
291
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
292
                    ' rx_errors %s, tx_errors %s'
293
294
        for port_stat in ports_stats:
295
            cls._update_controller_interface(switch, port_stat)
296
            cls.rrd.update((switch.id, port_stat.port_no.value),
297
                           rx_bytes=port_stat.rx_bytes.value,
298
                           tx_bytes=port_stat.tx_bytes.value,
299
                           rx_dropped=port_stat.rx_dropped.value,
300
                           tx_dropped=port_stat.tx_dropped.value,
301
                           rx_errors=port_stat.rx_errors.value,
302
                           tx_errors=port_stat.tx_errors.value)
303
304
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
305
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
306
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
307
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
308
309
    @staticmethod
310
    def _update_controller_interface(switch, port_stats):
311
        port_no = port_stats.port_no.value
312
        iface = switch.get_interface_by_port_no(port_no)
313
        if iface is not None:
314
            if iface.stats is None:
315
                iface.stats = OFCorePortStats()
316
            iface.stats.update(port_stats)
317
318
319
class AggregateStats(Stats):
320
    """Deal with AggregateStats message."""
321
322
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
323
324
    def request(self, conn):
325
        """Ask for flow stats."""
326
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
327
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
328
        self._send_event(req, conn)
329
        log.debug('Aggregate Stats request for switch %s sent.',
330
                  conn.switch.dpid)
331
332
    @classmethod
333
    def listen(cls, switch, aggregate_stats):
334
        """Receive flow stats."""
335
        debug_msg = 'Received aggregate stats from switch {}:' \
336
                    ' packet_count {}, byte_count {}, flow_count {}'
337
338
        for aggregate in aggregate_stats:
339
            # need to choose the _id to aggregate_stats
340
            # this class isn't used yet.
341
            cls.rrd.update((switch.id,),
342
                           packet_count=aggregate.packet_count.value,
343
                           byte_count=aggregate.byte_count.value,
344
                           flow_count=aggregate.flow_count.value)
345
346
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
347
                      aggregate.byte_count.value, aggregate.flow_count.value)
348
349
350
class FlowStats(Stats):
351
    """Deal with FlowStats message."""
352
353
    rrd = RRD('flows', ('packet_count', 'byte_count'))
354
355
    def request(self, conn):
356
        """Ask for flow stats."""
357
        request = self._get_versioned_request(conn.protocol.version)
358
        self._send_event(request, conn)
359
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
360
361
    @staticmethod
362
    def _get_versioned_request(of_version):
363
        if of_version == 0x01:
364
            return StatsRequest(
365
                body_type=StatsType.OFPST_FLOW,
366
                body=v0x01.FlowStatsRequest())
367
        return MultipartRequest(
368
            multipart_type=MultipartType.OFPMP_FLOW,
369
            body=v0x04.FlowStatsRequest())
370
371
    @classmethod
372
    def listen(cls, switch, flows_stats):
373
        """Receive flow stats."""
374
        flow_class = FlowFactory.get_class(switch)
375
        for flow_stat in flows_stats:
376
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
377
378
            # Update controller's flow
379
            controller_flow = switch.get_flow_by_id(flow.id)
380
            if controller_flow:
381
                controller_flow.stats = flow.stats
382
383
            # Update RRD database
384
            cls.rrd.update((switch.id, flow.id),
385
                           packet_count=flow.stats.packet_count,
386
                           byte_count=flow.stats.byte_count)
387