Test Failed
Pull Request — master (#33)
by
unknown
02:21
created

PortStats._update_controller_interface()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2
import time
3
from abc import ABCMeta, abstractmethod
4
from pathlib import Path
5
6
import rrdtool
7
from kytos.core import KytosEvent, log
8
# v0x01 and v0x04 PortStats are version independent
9
from pyof.v0x01.common.phy_port import Port  # pylint: disable=C0412
10
from pyof.v0x01.controller2switch.common import (AggregateStatsRequest,
11
                                                 MultipartType, v0x01)
12
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
13
from pyof.v0x04.controller2switch import multipart_request as MultipartRequest
14
# Disable warning about ungrouped pyof imports due to isort
15
from pyof.v0x04.controller2switch import v0x04
16
from napps.kytos.of_core.flow import FlowFactory
17
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
18
19
from . import settings
20
21
22
class Stats(metaclass=ABCMeta):
23
    """Abstract class for Statistics implementation."""
24
25
    rrd = None
26
27
    def __init__(self, msg_out_buffer):
28
        """Store a reference to the controller's msg_out buffer.
29
30
        Args:
31
            msg_out_buffer: Where to send events.
32
        """
33
        self._buffer = msg_out_buffer
34
35
    @abstractmethod
36
    def request(self, conn):
37
        """Request statistics."""
38
39
    @abstractmethod
40
    def listen(self, switch, stats):
41
        """Listen statistic replies."""
42
43
    def _send_event(self, req, conn):
44
        event = KytosEvent(
45
            name='kytos/of_stats.messages.out.ofpt_stats_request',
46
            content={'message': req, 'destination': conn})
47
        self._buffer.put(event)
48
49
50
class RRD:
51
    """Round-robin database for keeping stats.
52
53
    It store statistics every :data:`STATS_INTERVAL`.
54
    """
55
56
    def __init__(self, app_folder, data_sources):
57
        """Specify a folder to store RRDs.
58
59
        Args:
60
            app_folder (str): Parent folder for dpids folders.
61
            data_sources (iterable): Data source names (e.g. tx_bytes,
62
                rx_bytes).
63
        """
64
        self._app = app_folder
65
        self._ds = data_sources
66
67
    def update(self, index, tstamp=None, **ds_values):
68
        """Add a row to rrd file of *dpid* and *_id*.
69
70
        Args:
71
            dpid (str): Switch dpid.
72
            index (list of str): Index for the RRD database. Examples:
73
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
74
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
75
76
        Create rrd if necessary.
77
        """
78
        if tstamp is None:
79
            tstamp = 'N'
80
        rrd = self.get_or_create_rrd(index)
81
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
82
        with settings.RDD_LOCK:
83
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
84
85
    def get_rrd(self, index):
86
        """Return path of the RRD file for *dpid* with *basename*.
87
88
        If rrd doesn't exist, it is *not* created.
89
90
        Args:
91
            index (iterable of str): Index for the RRD database. Examples:
92
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
93
94
        Returns:
95
            str: Absolute RRD path.
96
97
        See Also:
98
            :meth:`get_or_create_rrd`
99
100
        """
101
        path = settings.DIR / self._app
102
        folders, basename = index[:-1], index[-1]
103
        for folder in folders:
104
            path = path / folder
105
        path = path / '{}.rrd'.format(basename)
106
        return str(path)
107
108
    def get_or_create_rrd(self, index, tstamp=None):
109
        """If rrd is not found, create it.
110
111
        Args:
112
            index (list of str): Index for the RRD database. Examples:
113
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
114
            tstamp (str, int): Value for start argument of RRD creation.
115
        """
116
        if tstamp is None:
117
            tstamp = 'N'
118
119
        rrd = self.get_rrd(index)
120
        if not Path(rrd).exists():
121
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
122
            parent = Path(rrd).parent
123
            if not parent.exists():
124
                # We may have concurrency problems creating a folder
125
                parent.mkdir(parents=True, exist_ok=True)
126
            self.create_rrd(rrd, tstamp)
127
        return rrd
128
129
    def create_rrd(self, rrd, tstamp=None):
130
        """Create an RRD file.
131
132
        Args:
133
            rrd (str): Path of RRD file to be created.
134
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
135
                Defaults to now.
136
        """
137
        def get_counter(ds_arg):
138
            """Return a DS for rrd creation."""
139
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_arg, settings.TIMEOUT,
140
                                                   settings.MIN, settings.MAX)
141
142
        if tstamp is None:
143
            tstamp = 'N'
144
        options = [rrd, '--start', str(tstamp), '--step',
145
                   str(settings.STATS_INTERVAL)]
146
        options.extend([get_counter(ds) for ds in self._ds])
147
        options.extend(self._get_archives())
148
        with settings.RDD_LOCK:
149
            rrdtool.create(*options)
150
151
    def fetch(self, index, start=None, end=None, n_points=None):
152
        """Fetch average values from rrd.
153
154
        Args:
155
            index (list of str): Index for the RRD database. Examples:
156
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
157
            start (str, int): Unix timestamp in seconds for the first stats.
158
                Defaults to be old enough to have the latest n_points
159
                available (now - n_points * settings.STATS_INTERVAL).
160
            end (str, int): Unix timestamp in seconds for the last stats.
161
                Defaults to current time.
162
            n_points (int): Number of points to return. May return more if
163
                there is no matching resolution in the RRD file, or less if
164
                there is no records for all the time range.
165
                Defaults to as many points as possible.
166
167
        Returns:
168
            A tuple with:
169
170
            1. Iterator over timestamps
171
            2. Column (DS) names
172
            3. List of rows as tuples
173
174
        """
175
        rrd = self.get_rrd(index)
176
        if not Path(rrd).exists():
177
            msg = 'RRD for app {} and index {} not found'.format(self._app,
178
                                                                 index)
179
            raise FileNotFoundError(msg)
180
181
        # Use integers to calculate resolution
182
        start, end = self._calc_start_end(start, end, n_points, rrd)
183
184
        # Find the best matching resolution for returning n_points.
185
        res_args = []
186
        if n_points is not None and isinstance(start, int) \
187
                and isinstance(end, int):
188
            resolution = (end - start) // n_points
189
            if resolution > 0:
190
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
191
192
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
193
        args.extend(res_args)
194
        with settings.RDD_LOCK:
195
            tstamps, cols, rows = rrdtool.fetch(*args)
196
        start, stop, step = tstamps
197
        # rrdtool range is different from Python's.
198
        return range(start + step, stop + 1, step), cols, rows
199
200
    @staticmethod
201
    def _calc_start_end(start, end, n_points, rrd):
202
        """Calculate start and end values for fetch command."""
203
        # Use integers to calculate resolution
204
        if end is None:
205
            end = int(time.time())
206
        if start is None:  # Latest n_points
207
            start = end - n_points * settings.STATS_INTERVAL
208
        elif start == 'first':  # Usually empty because 'first' is too old
209
            with settings.RDD_LOCK:
210
                start = rrdtool.first(rrd)
211
212
        # For RRDtool to include start and end timestamps.
213
        if isinstance(start, int):
214
            start -= 1
215
        if isinstance(end, int):
216
            end -= 1
217
218
        return start, end
219
220
    def fetch_latest(self, index):
221
        """Fetch only the value for now.
222
223
        Return zero values if there are no values recorded.
224
        """
225
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
226
        try:
227
            tstamps, cols, rows = self.fetch(index, start, end='now')
228
        except FileNotFoundError:
229
            # No RRD for port, so it will return zero values
230
            return {}
231
        # Last rows may have future timestamp and be empty
232
        latest = None
233
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
234
        # Search backwards for non-null values
235
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
236
            if row[0] is not None and tstamp > min_tstamp:
237
                latest = row
238
        # If no values are found, add zeros.
239
        if not latest:
240
            latest = [0] * len(cols)
241
        return {k: v for k, v in zip(cols, latest)}
242
243
    @classmethod
244
    def _get_archives(cls):
245
        """Averaged for all Data Sources."""
246
        averages = []
247
        # One month stats for the following periods:
248
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
249
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
250
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
251
                                                          settings.PERIOD))
252
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
253
        return averages
254
255
256
class PortStats(Stats):
257
    """Deal with PortStats messages."""
258
259
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
260
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
261
262
    def request(self, conn):
263
        """Ask for port stats."""
264
        request = self._get_versioned_request(conn.protocol.version)
265
        self._send_event(request, conn)
266
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
267
268
    @staticmethod
269
    def _get_versioned_request(of_version):
270
        if of_version == 0x01:
271
            return StatsRequest(
272
                body_type=StatsType.OFPST_PORT,
273
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
274
        return MultipartRequest(
275
            multipart_type=MultipartType.OFPMP_PORT_STATS,
276
            body=v0x04.PortStatsRequest())
277
278
    @classmethod
279
    def listen(cls, switch, ports_stats):
280
        """Receive port stats."""
281
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
282
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
283
                    ' rx_errors %s, tx_errors %s'
284
285
        for port_stats in ports_stats:
286
            cls._update_controller_interface(switch, port_stats)
287
            cls.rrd.update((switch.id, port_stats.port_no.value),
288
                           rx_bytes=port_stats.rx_bytes.value,
289
                           tx_bytes=port_stats.tx_bytes.value,
290
                           rx_dropped=port_stats.rx_dropped.value,
291
                           tx_dropped=port_stats.tx_dropped.value,
292
                           rx_errors=port_stats.rx_errors.value,
293
                           tx_errors=port_stats.tx_errors.value)
294
295
            log.debug(debug_msg, port_stats.port_no.value, switch.id,
296
                      port_stats.rx_bytes.value, port_stats.tx_bytes.value,
297
                      port_stats.rx_dropped.value, port_stats.tx_dropped.value,
298
                      port_stats.rx_errors.value, port_stats.tx_errors.value)
299
300
    @staticmethod
301
    def _update_controller_interface(switch, port_stats):
302
        port_no = port_stats.port_no.value
303
        iface = switch.get_interface_by_port_no(port_no)
304
        if iface is not None:
305
            if iface.stats is None:
306
                iface.stats = OFCorePortStats()
307
            iface.stats.update(port_stats)
308
309
310
class AggregateStats(Stats):
311
    """Deal with AggregateStats message."""
312
313
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
314
315
    def request(self, conn):
316
        """Ask for flow stats."""
317
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
318
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
319
        self._send_event(req, conn)
320
        log.debug('Aggregate Stats request for switch %s sent.',
321
                  conn.switch.dpid)
322
323
    @classmethod
324
    def listen(cls, switch, aggregate_stats):
325
        """Receive flow stats."""
326
        debug_msg = 'Received aggregate stats from switch {}:' \
327
                    ' packet_count {}, byte_count {}, flow_count {}'
328
329
        for ag_stats in aggregate_stats:
330
            # need to choose the _id to aggregate_stats
331
            # this class isn't used yet.
332
            cls.rrd.update((switch.id,),
333
                           packet_count=ag_stats.packet_count.value,
334
                           byte_count=ag_stats.byte_count.value,
335
                           flow_count=ag_stats.flow_count.value)
336
337
            log.debug(debug_msg, switch.id, ag_stats.packet_count.value,
338
                      ag_stats.byte_count.value, ag_stats.flow_count.value)
339
340
341
class FlowStats(Stats):
342
    """Deal with FlowStats message."""
343
344
    rrd = RRD('flows', ('packet_count', 'byte_count'))
345
346
    def request(self, conn):
347
        """Ask for flow stats."""
348
        request = self._get_versioned_request(conn.protocol.version)
349
        self._send_event(request, conn)
350
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
351
352
    @staticmethod
353
    def _get_versioned_request(of_version):
354
        if of_version == 0x01:
355
            return StatsRequest(
356
                body_type=StatsType.OFPST_FLOW,
357
                body=v0x01.FlowStatsRequest())
358
        return MultipartRequest(
359
            multipart_type=MultipartType.OFPMP_FLOW,
360
            body=v0x04.FlowStatsRequest())
361
362
    @classmethod
363
    def listen(cls, switch, flows_stats):
364
        """Receive flow stats."""
365
        flow_class = FlowFactory.get_class(switch)
366
        for flow_stats in flows_stats:
367
            flow = flow_class.from_of_flow_stats(flow_stats, switch)
368
369
            # Update controller's flow
370
            controller_flow = switch.get_flow_by_id(flow.id)
371
            if controller_flow:
372
                controller_flow.stats = flow.stats
373
374
            # Update RRD database
375
            cls.rrd.update((switch.id, flow.id),
376
                           packet_count=flow.stats.packet_count,
377
                           byte_count=flow.stats.byte_count)
378