Passed
Push — master ( 8301e9...ef307d )
by Humberto
01:11 queued 12s
created

build.stats.AggregateStats.request()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 7
ccs 1
cts 5
cp 0.2
crap 1.512
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4 1
from pathlib import Path
5
6 1
import pyof.v0x01.controller2switch.common as v0x01
7 1
import rrdtool
8
# pylint: disable=C0411,C0412
9 1
from pyof.v0x01.common.phy_port import Port
10 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
13 1
from pyof.v0x04.controller2switch.common import MultipartType
14 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16 1
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18 1
from napps.kytos.of_core.flow import FlowFactory
19 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20
21
# Disable warning about ungrouped pyof imports due to isort
22 1
from . import settings
23
24
25 1
class Stats(metaclass=ABCMeta):
26
    """Abstract class for Statistics implementation."""
27
28 1
    rrd = None
29
30 1
    def __init__(self, msg_out_buffer):
31
        """Store a reference to the controller's msg_out buffer.
32
33
        Args:
34
            msg_out_buffer: Where to send events.
35
36
        """
37
        self._buffer = msg_out_buffer
38
39 1
    @abstractmethod
40
    def request(self, conn):
41
        """Request statistics."""
42
43 1
    @abstractmethod
44
    def listen(self, switch, stats):
45
        """Listen statistic replies."""
46
47 1
    def _send_event(self, req, conn):
48
        event = KytosEvent(
49
            name='kytos/of_stats.messages.out.ofpt_stats_request',
50
            content={'message': req, 'destination': conn})
51
        self._buffer.put(event)
52
53
54 1
class RRD:
55
    """Round-robin database for keeping stats.
56
57
    It store statistics every :data:`STATS_INTERVAL`.
58
    """
59
60 1
    def __init__(self, app_folder, data_sources):
61
        """Specify a folder to store RRDs.
62
63
        Args:
64
            app_folder (str): Parent folder for dpids folders.
65
            data_sources (iterable): Data source names (e.g. tx_bytes,
66
                rx_bytes).
67
68
        """
69 1
        self._app = app_folder
70 1
        self._ds = data_sources
71
72 1
    def update(self, index, tstamp=None, **ds_values):
73
        """Add a row to rrd file of *dpid* and *_id*.
74
75
        Args:
76
            dpid (str): Switch dpid.
77
            index (list of str): Index for the RRD database. Examples:
78
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
79
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
80
81
        Create rrd if necessary.
82
83
        """
84 1
        if tstamp is None:
85
            tstamp = 'N'
86 1
        rrd = self.get_or_create_rrd(index)
87 1
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
88 1
        with settings.RRD_LOCK:
89 1
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
90
91 1
    def get_rrd(self, index):
92
        """Return path of the RRD file for *dpid* with *basename*.
93
94
        If rrd doesn't exist, it is *not* created.
95
96
        Args:
97
            index (iterable of str): Index for the RRD database. Examples:
98
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
99
100
        Returns:
101
            str: Absolute RRD path.
102
103
        See Also:
104
            :meth:`get_or_create_rrd`
105
106
        """
107 1
        path = settings.DIR / self._app
108 1
        folders, basename = index[:-1], index[-1]
109 1
        for folder in folders:
110 1
            path = path / folder
111 1
        path = path / '{}.rrd'.format(basename)
112 1
        return str(path)
113
114 1
    def get_or_create_rrd(self, index, tstamp=None):
115
        """If rrd is not found, create it.
116
117
        Args:
118
            index (list of str): Index for the RRD database. Examples:
119
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
120
            tstamp (str, int): Value for start argument of RRD creation.
121
122
        """
123 1
        if tstamp is None:
124 1
            tstamp = 'N'
125
126 1
        rrd = self.get_rrd(index)
127 1
        if not Path(rrd).exists():
128
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
129
            parent = Path(rrd).parent
130
            if not parent.exists():
131
                # We may have concurrency problems creating a folder
132
                parent.mkdir(parents=True, exist_ok=True)
133
            self.create_rrd(rrd, tstamp)
134 1
        return rrd
135
136 1
    def create_rrd(self, rrd, tstamp=None):
137
        """Create an RRD file.
138
139
        Args:
140
            rrd (str): Path of RRD file to be created.
141
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
142
                Defaults to now.
143
144
        """
145 1
        def get_counter(ds_value):
146
            """Return a DS for rrd creation."""
147 1
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
148
                                                   settings.MIN, settings.MAX)
149
150 1
        if tstamp is None:
151
            tstamp = 'N'
152 1
        options = [rrd, '--start', str(tstamp), '--step',
153
                   str(settings.STATS_INTERVAL)]
154 1
        options.extend([get_counter(ds) for ds in self._ds])
155 1
        options.extend(self._get_archives())
156 1
        with settings.RRD_LOCK:
157 1
            rrdtool.create(*options)
158
159
    # pylint: disable=R0914
160 1
    def fetch(self, index, start=None, end=None, n_points=None):
161
        """Fetch average values from rrd.
162
163
        Args:
164
            index (list of str): Index for the RRD database. Examples:
165
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
166
            start (str, int): Unix timestamp in seconds for the first stats.
167
                Defaults to be old enough to have the latest n_points
168
                available (now - n_points * settings.STATS_INTERVAL).
169
            end (str, int): Unix timestamp in seconds for the last stats.
170
                Defaults to current time.
171
            n_points (int): Number of points to return. May return more if
172
                there is no matching resolution in the RRD file, or less if
173
                there is no records for all the time range.
174
                Defaults to as many points as possible.
175
176
        Returns:
177
            A tuple with:
178
179
            1. Iterator over timestamps
180
            2. Column (DS) names
181
            3. List of rows as tuples
182
183
        """
184 1
        rrd = self.get_rrd(index)
185 1
        if not Path(rrd).exists():
186 1
            msg = 'RRD for app {} and index {} not found'.format(self._app,
187
                                                                 index)
188 1
            raise FileNotFoundError(msg)
189
190
        # Use integers to calculate resolution
191 1
        start, end = self._calc_start_end(start, end, n_points, rrd)
192
193
        # Find the best matching resolution for returning n_points.
194 1
        res_args = []
195 1
        if n_points is not None and isinstance(start, int) \
196
                and isinstance(end, int):
197
            resolution = (end - start) // n_points
198
            if resolution > 0:
199
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
200
201 1
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
202 1
        args.extend(res_args)
203 1
        with settings.RRD_LOCK:
204 1
            tstamps, cols, rows = rrdtool.fetch(*args)
205 1
        start, stop, step = tstamps
206
        # rrdtool range is different from Python's.
207 1
        return range(start + step, stop + 1, step), cols, rows
208
209 1
    @staticmethod
210
    def _calc_start_end(start, end, n_points, rrd):
211
        """Calculate start and end values for fetch command."""
212
        # Use integers to calculate resolution
213 1
        if end is None:
214
            end = int(time.time())
215 1
        if start is None:  # Latest n_points
216
            start = end - n_points * settings.STATS_INTERVAL
217 1
        elif start == 'first':  # Usually empty because 'first' is too old
218
            with settings.RRD_LOCK:
219
                start = rrdtool.first(rrd)
220
221
        # For RRDtool to include start and end timestamps.
222 1
        if isinstance(start, int):
223 1
            start -= 1
224 1
        if isinstance(end, int):
225 1
            end -= 1
226
227 1
        return start, end
228
229 1
    def fetch_latest(self, index):
230
        """Fetch only the value for now.
231
232
        Return zero values if there are no values recorded.
233
        """
234 1
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
235 1
        try:
236 1
            tstamps, cols, rows = self.fetch(index, start, end='now')
237 1
        except FileNotFoundError:
238
            # No RRD for port, so it will return zero values
239 1
            return {}
240
        # Last rows may have future timestamp and be empty
241
        latest = None
242
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
243
        # Search backwards for non-null values
244
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
245
            if row[0] is not None and tstamp > min_tstamp:
246
                latest = row
247
        # If no values are found, add zeros.
248
        if not latest:
249
            latest = [0] * len(cols)
250
        return dict(zip(cols, latest))
251
252 1
    @classmethod
253
    def _get_archives(cls):
254
        """Averaged for all Data Sources."""
255 1
        averages = []
256
        # One month stats for the following periods:
257 1
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
258
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
259 1
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
260
                                                          settings.PERIOD))
261
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
262 1
        return averages
263
264
265 1
class PortStats(Stats):
266
    """Deal with PortStats messages."""
267
268 1
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
269
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
270
271 1
    def request(self, conn):
272
        """Ask for port stats."""
273
        request = self._get_versioned_request(conn.protocol.version)
274
        self._send_event(request, conn)
275
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
276
277 1
    @staticmethod
278
    def _get_versioned_request(of_version):
279
        if of_version == 0x01:
280
            return StatsRequest(
281
                body_type=StatsType.OFPST_PORT,
282
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
283
        return MultipartRequest(
284
            multipart_type=MultipartType.OFPMP_PORT_STATS,
285
            body=v0x04.PortStatsRequest())
286
287 1
    @classmethod
288
    def listen(cls, switch, ports_stats):
289
        """Receive port stats."""
290
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
291
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
292
                    ' rx_errors %s, tx_errors %s'
293
294
        for port_stat in ports_stats:
295
            cls._update_controller_interface(switch, port_stat)
296
            cls.rrd.update((switch.id, port_stat.port_no.value),
297
                           rx_bytes=port_stat.rx_bytes.value,
298
                           tx_bytes=port_stat.tx_bytes.value,
299
                           rx_dropped=port_stat.rx_dropped.value,
300
                           tx_dropped=port_stat.tx_dropped.value,
301
                           rx_errors=port_stat.rx_errors.value,
302
                           tx_errors=port_stat.tx_errors.value)
303
304
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
305
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
306
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
307
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
308
309 1
    @staticmethod
310
    def _update_controller_interface(switch, port_stats):
311
        port_no = port_stats.port_no.value
312
        iface = switch.get_interface_by_port_no(port_no)
313
        if iface is not None:
314
            if iface.stats is None:
315
                iface.stats = OFCorePortStats()
316
            iface.stats.update(port_stats)
317
318
319 1
class AggregateStats(Stats):
320
    """Deal with AggregateStats message."""
321
322 1
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
323
324 1
    def request(self, conn):
325
        """Ask for flow stats."""
326
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
327
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
328
        self._send_event(req, conn)
329
        log.debug('Aggregate Stats request for switch %s sent.',
330
                  conn.switch.dpid)
331
332 1
    @classmethod
333
    def listen(cls, switch, aggregate_stats):
334
        """Receive flow stats."""
335
        debug_msg = 'Received aggregate stats from switch {}:' \
336
                    ' packet_count {}, byte_count {}, flow_count {}'
337
338
        for aggregate in aggregate_stats:
339
            # need to choose the _id to aggregate_stats
340
            # this class isn't used yet.
341
            cls.rrd.update((switch.id,),
342
                           packet_count=aggregate.packet_count.value,
343
                           byte_count=aggregate.byte_count.value,
344
                           flow_count=aggregate.flow_count.value)
345
346
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
347
                      aggregate.byte_count.value, aggregate.flow_count.value)
348
349
350 1
class FlowStats(Stats):
351
    """Deal with FlowStats message."""
352
353 1
    rrd = RRD('flows', ('packet_count', 'byte_count'))
354
355 1
    def request(self, conn):
356
        """Ask for flow stats."""
357
        request = self._get_versioned_request(conn.protocol.version)
358
        self._send_event(request, conn)
359
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
360
361 1
    @staticmethod
362
    def _get_versioned_request(of_version):
363
        if of_version == 0x01:
364
            return StatsRequest(
365
                body_type=StatsType.OFPST_FLOW,
366
                body=v0x01.FlowStatsRequest())
367
        return MultipartRequest(
368
            multipart_type=MultipartType.OFPMP_FLOW,
369
            body=v0x04.FlowStatsRequest())
370
371 1
    @classmethod
372
    def listen(cls, switch, flows_stats):
373
        """Receive flow stats."""
374
        flow_class = FlowFactory.get_class(switch)
375
        for flow_stat in flows_stats:
376
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
377
378
            # Update controller's flow
379
            controller_flow = switch.get_flow_by_id(flow.id)
380
            if controller_flow:
381
                controller_flow.stats = flow.stats
382
383
            # Update RRD database
384
            cls.rrd.update((switch.id, flow.id),
385
                           packet_count=flow.stats.packet_count,
386
                           byte_count=flow.stats.byte_count)
387