Passed
Pull Request — master (#40)
by Jose
01:58
created

build.stats.Stats.request()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 3
ccs 1
cts 1
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4 1
from pathlib import Path
5
6 1
import pyof.v0x01.controller2switch.common as v0x01
7 1
import rrdtool
8
# pylint: disable=C0411,C0412
9 1
from pyof.v0x01.common.phy_port import Port
10 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
13 1
from pyof.v0x04.controller2switch.common import MultipartType
14 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16 1
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18 1
from napps.kytos.of_core.flow import FlowFactory
19 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20
21
# Disable warning about ungrouped pyof imports due to isort
22 1
from . import settings
23
24
25 1
class Stats(metaclass=ABCMeta):
26
    """Abstract class for Statistics implementation."""
27
28 1
    rrd = None
29
30 1
    def __init__(self, msg_out_buffer):
31
        """Store a reference to the controller's msg_out buffer.
32
33
        Args:
34
            msg_out_buffer: Where to send events.
35
36
        """
37
        self._buffer = msg_out_buffer
38
39 1
    @abstractmethod
40
    def request(self, conn):
41
        """Request statistics."""
42
43 1
    @abstractmethod
44
    def listen(self, switch, stats):
45
        """Listen statistic replies."""
46
47 1
    def _send_event(self, req, conn):
48
        event = KytosEvent(
49
            name='kytos/of_stats.messages.out.ofpt_stats_request',
50
            content={'message': req, 'destination': conn})
51
        self._buffer.put(event)
52
53
54 1
class RRD:
55
    """Round-robin database for keeping stats.
56
57
    It store statistics every :data:`STATS_INTERVAL`.
58
    """
59
60 1
    def __init__(self, app_folder, data_sources):
61
        """Specify a folder to store RRDs.
62
63
        Args:
64
            app_folder (str): Parent folder for dpids folders.
65
            data_sources (iterable): Data source names (e.g. tx_bytes,
66
                rx_bytes).
67
68
        """
69 1
        self._app = app_folder
70 1
        self._ds = data_sources
71
72 1
    def update(self, index, tstamp=None, **ds_values):
73
        """Add a row to rrd file of *dpid* and *_id*.
74
75
        Args:
76
            dpid (str): Switch dpid.
77
            index (list of str): Index for the RRD database. Examples:
78
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
79
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
80
81
        Create rrd if necessary.
82
83
        """
84 1
        if tstamp is None:
85
            tstamp = 'N'
86 1
        rrd = self.get_or_create_rrd(index)
87 1
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
88 1
        with settings.RRD_LOCK:
89 1
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
90
91 1
    def get_rrd(self, index):
92
        """Return path of the RRD file for *dpid* with *basename*.
93
94
        If rrd doesn't exist, it is *not* created.
95
96
        Args:
97
            index (iterable of str): Index for the RRD database. Examples:
98
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
99
100
        Returns:
101
            str: Absolute RRD path.
102
103
        See Also:
104
            :meth:`get_or_create_rrd`
105
106
        """
107 1
        path = settings.DIR / self._app
108 1
        folders, basename = index[:-1], index[-1]
109 1
        for folder in folders:
110 1
            path = path / folder
111 1
        path = path / '{}.rrd'.format(basename)
112 1
        return str(path)
113
114 1
    def get_or_create_rrd(self, index, tstamp=None):
115
        """If rrd is not found, create it.
116
117
        Args:
118
            index (list of str): Index for the RRD database. Examples:
119
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
120
            tstamp (str, int): Value for start argument of RRD creation.
121
122
        """
123 1
        if tstamp is None:
124 1
            tstamp = 'N'
125
126 1
        rrd = self.get_rrd(index)
127 1
        if not Path(rrd).exists():
128
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
129
            parent = Path(rrd).parent
130
            if not parent.exists():
131
                # We may have concurrency problems creating a folder
132
                parent.mkdir(parents=True, exist_ok=True)
133
            self.create_rrd(rrd, tstamp)
134 1
        return rrd
135
136 1
    def create_rrd(self, rrd, tstamp=None):
137
        """Create an RRD file.
138
139
        Args:
140
            rrd (str): Path of RRD file to be created.
141
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
142
                Defaults to now.
143
144
        """
145 1
        def get_counter(ds_value):
146
            """Return a DS for rrd creation."""
147 1
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
148
                                                   settings.MIN, settings.MAX)
149
150 1
        if tstamp is None:
151
            tstamp = 'N'
152 1
        options = [rrd, '--start', str(tstamp), '--step',
153
                   str(settings.STATS_INTERVAL)]
154 1
        options.extend([get_counter(ds) for ds in self._ds])
155 1
        options.extend(self._get_archives())
156 1
        with settings.RRD_LOCK:
157 1
            rrdtool.create(*options)
158
159
    # pylint: disable=R0914
160 1
    def fetch(self, index, start=None, end=None, n_points=None):
161
        """Fetch average values from rrd.
162
163
        Args:
164
            index (list of str): Index for the RRD database. Examples:
165
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
166
            start (str, int): Unix timestamp in seconds for the first stats.
167
                Defaults to be old enough to have the latest n_points
168
                available (now - n_points * settings.STATS_INTERVAL).
169
            end (str, int): Unix timestamp in seconds for the last stats.
170
                Defaults to current time.
171
            n_points (int): Number of points to return. May return more if
172
                there is no matching resolution in the RRD file, or less if
173
                there is no records for all the time range.
174
                Defaults to as many points as possible.
175
176
        Returns:
177
            A tuple with:
178
179
            1. Iterator over timestamps
180
            2. Column (DS) names
181
            3. List of rows as tuples
182
183
        """
184 1
        rrd = self.get_rrd(index)
185 1
        if not Path(rrd).exists():
186 1
            msg = 'RRD for app {} and index {} not found'.format(self._app,
187
                                                                 index)
188 1
            raise FileNotFoundError(msg)
189
190
        # Use integers to calculate resolution
191 1
        start, end = self._calc_start_end(start, end, n_points, rrd)
192
193
        # Find the best matching resolution for returning n_points.
194 1
        res_args = []
195 1
        if n_points is not None and isinstance(start, int) \
196
                and isinstance(end, int):
197
            resolution = (end - start) // n_points
198
            if resolution > 0:
199
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
200
201 1
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
202 1
        args.extend(res_args)
203 1
        with settings.RRD_LOCK:
204 1
            tstamps, cols, rows = rrdtool.fetch(*args)
205 1
        start, stop, step = tstamps
206
        # rrdtool range is different from Python's.
207 1
        return range(start + step, stop + 1, step), cols, rows
208
209 1
    @staticmethod
210
    def _calc_start_end(start, end, n_points, rrd):
211
        """Calculate start and end values for fetch command."""
212
        # Use integers to calculate resolution
213 1
        if end is None:
214
            end = int(time.time())
215 1
        if start is None:  # Latest n_points
216
            start = end - n_points * settings.STATS_INTERVAL
217 1
        elif start == 'first':  # Usually empty because 'first' is too old
218
            with settings.RRD_LOCK:
219
                start = rrdtool.first(rrd)
220
221
        # For RRDtool to include start and end timestamps.
222 1
        if isinstance(start, int):
223 1
            start -= 1
224 1
        if isinstance(end, int):
225 1
            end -= 1
226
227 1
        return start, end
228
229 1
    def fetch_latest(self, index):
230
        """Fetch only the value for now.
231
232
        Return zero values if there are no values recorded.
233
        """
234 1
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
235 1
        try:
236 1
            tstamps, cols, rows = self.fetch(index, start, end='now')
237 1
        except FileNotFoundError:
238
            # No RRD for port, so it will return zero values
239 1
            return {}
240
        # Last rows may have future timestamp and be empty
241
        latest = None
242
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
243
        # Search backwards for non-null values
244
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
245
            if row[0] is not None and tstamp > min_tstamp:
246
                latest = row
247
        # If no values are found, add zeros.
248
        if not latest:
249
            latest = [0] * len(cols)
250
251
        # pylint: disable=R1721
252
        latest_fetched = {k: v for k, v in zip(cols, latest)}
253
254
        return latest_fetched
255
256 1
    @classmethod
257
    def _get_archives(cls):
258
        """Averaged for all Data Sources."""
259 1
        averages = []
260
        # One month stats for the following periods:
261 1
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
262
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
263 1
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
264
                                                          settings.PERIOD))
265
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
266 1
        return averages
267
268
269 1
class PortStats(Stats):
270
    """Deal with PortStats messages."""
271
272 1
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
273
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
274
275 1
    def request(self, conn):
276
        """Ask for port stats."""
277
        request = self._get_versioned_request(conn.protocol.version)
278
        self._send_event(request, conn)
279
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
280
281 1
    @staticmethod
282
    def _get_versioned_request(of_version):
283
        if of_version == 0x01:
284
            return StatsRequest(
285
                body_type=StatsType.OFPST_PORT,
286
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
287
        return MultipartRequest(
288
            multipart_type=MultipartType.OFPMP_PORT_STATS,
289
            body=v0x04.PortStatsRequest())
290
291 1
    @classmethod
292
    def listen(cls, switch, ports_stats):
293
        """Receive port stats."""
294
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
295
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
296
                    ' rx_errors %s, tx_errors %s'
297
298
        for port_stat in ports_stats:
299
            cls._update_controller_interface(switch, port_stat)
300
            cls.rrd.update((switch.id, port_stat.port_no.value),
301
                           rx_bytes=port_stat.rx_bytes.value,
302
                           tx_bytes=port_stat.tx_bytes.value,
303
                           rx_dropped=port_stat.rx_dropped.value,
304
                           tx_dropped=port_stat.tx_dropped.value,
305
                           rx_errors=port_stat.rx_errors.value,
306
                           tx_errors=port_stat.tx_errors.value)
307
308
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
309
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
310
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
311
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
312
313 1
    @staticmethod
314
    def _update_controller_interface(switch, port_stats):
315
        port_no = port_stats.port_no.value
316
        iface = switch.get_interface_by_port_no(port_no)
317
        if iface is not None:
318
            if iface.stats is None:
319
                iface.stats = OFCorePortStats()
320
            iface.stats.update(port_stats)
321
322
323 1
class AggregateStats(Stats):
324
    """Deal with AggregateStats message."""
325
326 1
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
327
328 1
    def request(self, conn):
329
        """Ask for flow stats."""
330
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
331
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
332
        self._send_event(req, conn)
333
        log.debug('Aggregate Stats request for switch %s sent.',
334
                  conn.switch.dpid)
335
336 1
    @classmethod
337
    def listen(cls, switch, aggregate_stats):
338
        """Receive flow stats."""
339
        debug_msg = 'Received aggregate stats from switch {}:' \
340
                    ' packet_count {}, byte_count {}, flow_count {}'
341
342
        for aggregate in aggregate_stats:
343
            # need to choose the _id to aggregate_stats
344
            # this class isn't used yet.
345
            cls.rrd.update((switch.id,),
346
                           packet_count=aggregate.packet_count.value,
347
                           byte_count=aggregate.byte_count.value,
348
                           flow_count=aggregate.flow_count.value)
349
350
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
351
                      aggregate.byte_count.value, aggregate.flow_count.value)
352
353
354 1
class FlowStats(Stats):
355
    """Deal with FlowStats message."""
356
357 1
    rrd = RRD('flows', ('packet_count', 'byte_count'))
358
359 1
    def request(self, conn):
360
        """Ask for flow stats."""
361
        request = self._get_versioned_request(conn.protocol.version)
362
        self._send_event(request, conn)
363
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
364
365 1
    @staticmethod
366
    def _get_versioned_request(of_version):
367
        if of_version == 0x01:
368
            return StatsRequest(
369
                body_type=StatsType.OFPST_FLOW,
370
                body=v0x01.FlowStatsRequest())
371
        return MultipartRequest(
372
            multipart_type=MultipartType.OFPMP_FLOW,
373
            body=v0x04.FlowStatsRequest())
374
375 1
    @classmethod
376
    def listen(cls, switch, flows_stats):
377
        """Receive flow stats."""
378
        flow_class = FlowFactory.get_class(switch)
379
        for flow_stat in flows_stats:
380
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
381
382
            # Update controller's flow
383
            controller_flow = switch.get_flow_by_id(flow.id)
384
            if controller_flow:
385
                controller_flow.stats = flow.stats
386
387
            # Update RRD database
388
            cls.rrd.update((switch.id, flow.id),
389
                           packet_count=flow.stats.packet_count,
390
                           byte_count=flow.stats.byte_count)
391