Test Failed
Pull Request — master (#42)
by Jose
03:22
created

build.stats.Stats._save_event_callback()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 4
dl 0
loc 7
rs 10
c 0
b 0
f 0
ccs 0
cts 1
cp 0
crap 6
1
"""Module with Classes to handle statistics."""
2
import time
3
from abc import ABCMeta, abstractmethod
4
from pathlib import Path
5
6
import pyof.v0x01.controller2switch.common as v0x01
7
import rrdtool
8
# v0x01 and v0x04 PortStats are version independent
9
from napps.kytos.of_core.flow import FlowFactory
10
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
11
from napps.kytos.of_stats import settings
12
# pylint: disable=C0411,C0412
13
from pyof.v0x01.common.phy_port import Port
14
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
15
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
16
from pyof.v0x04.controller2switch import multipart_request as v0x04
17
from pyof.v0x04.controller2switch.common import MultipartType
18
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
19
20
from kytos.core import KytosEvent, log
21
22
23
class Stats(metaclass=ABCMeta):
24
    """Abstract class for Statistics implementation."""
25
26
    rrd = None
27
28
    def __init__(self, msg_out_buffer, msg_app_buffer):
29
        """Store a reference to the controller's msg_out buffer.
30
31
        Args:
32
            msg_out_buffer: Where to send events.
33
34
        """
35
        self._buffer = msg_out_buffer
36
        self._app_buffer = msg_app_buffer
37
38
    @abstractmethod
39
    def request(self, conn):
40
        """Request statistics."""
41
42
    @abstractmethod
43
    def listen(self, switch, stats):
44
        """Listen statistic replies."""
45
46
    def _send_event(self, req, conn):
47
        event = KytosEvent(
48
            name='kytos/of_stats.messages.out.ofpt_stats_request',
49
            content={'message': req, 'destination': conn})
50
        self._buffer.put(event)
51
52
    @classmethod
53
    def _save_event_callback(cls, _event, data, error):
54
        """Execute the callback to handle with kronos event to save data."""
55
        if error:
56
            log.error(f'Can\'t save stats in kytos/kronos: {error}')
57
58
        log.info(data)
59
60
61
class RRD:
62
    """Round-robin database for keeping stats.
63
64
    It store statistics every :data:`STATS_INTERVAL`.
65
    """
66
67
    def __init__(self, app_folder, data_sources):
68
        """Specify a folder to store RRDs.
69
70
        Args:
71
            app_folder (str): Parent folder for dpids folders.
72
            data_sources (iterable): Data source names (e.g. tx_bytes,
73
                rx_bytes).
74
75
        """
76
        self._app = app_folder
77
        self._ds = data_sources
78
79
    def update(self, index, tstamp=None, **ds_values):
80
        """Add a row to rrd file of *dpid* and *_id*.
81
82
        Args:
83
            dpid (str): Switch dpid.
84
            index (list of str): Index for the RRD database. Examples:
85
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
86
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
87
88
        Create rrd if necessary.
89
90
        """
91
        if tstamp is None:
92
            tstamp = 'N'
93
        rrd = self.get_or_create_rrd(index)
94
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
95
        with settings.RRD_LOCK:
96
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
97
98
    def get_rrd(self, index):
99
        """Return path of the RRD file for *dpid* with *basename*.
100
101
        If rrd doesn't exist, it is *not* created.
102
103
        Args:
104
            index (iterable of str): Index for the RRD database. Examples:
105
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
106
107
        Returns:
108
            str: Absolute RRD path.
109
110
        See Also:
111
            :meth:`get_or_create_rrd`
112
113
        """
114
        path = settings.DIR / self._app
115
        folders, basename = index[:-1], index[-1]
116
        for folder in folders:
117
            path = path / folder
118
        path = path / '{}.rrd'.format(basename)
119
        return str(path)
120
121
    def get_or_create_rrd(self, index, tstamp=None):
122
        """If rrd is not found, create it.
123
124
        Args:
125
            index (list of str): Index for the RRD database. Examples:
126
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
127
            tstamp (str, int): Value for start argument of RRD creation.
128
129
        """
130
        if tstamp is None:
131
            tstamp = 'N'
132
133
        rrd = self.get_rrd(index)
134
        if not Path(rrd).exists():
135
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
136
            parent = Path(rrd).parent
137
            if not parent.exists():
138
                # We may have concurrency problems creating a folder
139
                parent.mkdir(parents=True, exist_ok=True)
140
            self.create_rrd(rrd, tstamp)
141
        return rrd
142
143
    def create_rrd(self, rrd, tstamp=None):
144
        """Create an RRD file.
145
146
        Args:
147
            rrd (str): Path of RRD file to be created.
148
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
149
                Defaults to now.
150
151
        """
152
        def get_counter(ds_value):
153
            """Return a DS for rrd creation."""
154
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
155
                                                   settings.MIN, settings.MAX)
156
157
        if tstamp is None:
158
            tstamp = 'N'
159
        options = [rrd, '--start', str(tstamp), '--step',
160
                   str(settings.STATS_INTERVAL)]
161
        options.extend([get_counter(ds) for ds in self._ds])
162
        options.extend(self._get_archives())
163
        with settings.RRD_LOCK:
164
            rrdtool.create(*options)
165
166
    # pylint: disable=R0914
167
    def fetch(self, index, start=None, end=None, n_points=None):
168
        """Fetch average values from rrd.
169
170
        Args:
171
            index (list of str): Index for the RRD database. Examples:
172
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
173
            start (str, int): Unix timestamp in seconds for the first stats.
174
                Defaults to be old enough to have the latest n_points
175
                available (now - n_points * settings.STATS_INTERVAL).
176
            end (str, int): Unix timestamp in seconds for the last stats.
177
                Defaults to current time.
178
            n_points (int): Number of points to return. May return more if
179
                there is no matching resolution in the RRD file, or less if
180
                there is no records for all the time range.
181
                Defaults to as many points as possible.
182
183
        Returns:
184
            A tuple with:
185
186
            1. Iterator over timestamps
187
            2. Column (DS) names
188
            3. List of rows as tuples
189
190
        """
191
        rrd = self.get_rrd(index)
192
        if not Path(rrd).exists():
193
            msg = 'RRD for app {} and index {} not found'.format(self._app,
194
                                                                 index)
195
            raise FileNotFoundError(msg)
196
197
        # Use integers to calculate resolution
198
        start, end = self._calc_start_end(start, end, n_points, rrd)
199
200
        # Find the best matching resolution for returning n_points.
201
        res_args = []
202
        if n_points is not None and isinstance(start, int) \
203
                and isinstance(end, int):
204
            resolution = (end - start) // n_points
205
            if resolution > 0:
206
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
207
208
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
209
        args.extend(res_args)
210
        with settings.RRD_LOCK:
211
            tstamps, cols, rows = rrdtool.fetch(*args)
212
        start, stop, step = tstamps
213
        # rrdtool range is different from Python's.
214
        return range(start + step, stop + 1, step), cols, rows
215
216
    @staticmethod
217
    def _calc_start_end(start, end, n_points, rrd):
218
        """Calculate start and end values for fetch command."""
219
        # Use integers to calculate resolution
220
        if end is None:
221
            end = int(time.time())
222
        if start is None:  # Latest n_points
223
            start = end - n_points * settings.STATS_INTERVAL
224
        elif start == 'first':  # Usually empty because 'first' is too old
225
            with settings.RRD_LOCK:
226
                start = rrdtool.first(rrd)
227
228
        # For RRDtool to include start and end timestamps.
229
        if isinstance(start, int):
230
            start -= 1
231
        if isinstance(end, int):
232
            end -= 1
233
234
        return start, end
235
236
    def fetch_latest(self, index):
237
        """Fetch only the value for now.
238
239
        Return zero values if there are no values recorded.
240
        """
241
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
242
        try:
243
            tstamps, cols, rows = self.fetch(index, start, end='now')
244
        except FileNotFoundError:
245
            # No RRD for port, so it will return zero values
246
            return {}
247
        # Last rows may have future timestamp and be empty
248
        latest = None
249
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
250
        # Search backwards for non-null values
251
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
252
            if row[0] is not None and tstamp > min_tstamp:
253
                latest = row
254
        # If no values are found, add zeros.
255
        if not latest:
256
            latest = [0] * len(cols)
257
        return dict(zip(cols, latest))
258
259
    @classmethod
260
    def _get_archives(cls):
261
        """Averaged for all Data Sources."""
262
        averages = []
263
        # One month stats for the following periods:
264
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
265
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
266
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
267
                                                          settings.PERIOD))
268
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
269
        return averages
270
271
272
class PortStats(Stats):
273
    """Deal with PortStats messages."""
274
275
    rrd = RRD('ports', [rt + 'x_' + stat for stat in
276
                        ('bytes', 'dropped', 'errors') for rt in 'rt'])
277
278
    def request(self, conn):
279
        """Ask for port stats."""
280
        request = self._get_versioned_request(conn.protocol.version)
281
        self._send_event(request, conn)
282
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
283
284
    @staticmethod
285
    def _get_versioned_request(of_version):
286
        if of_version == 0x01:
287
            return StatsRequest(
288
                body_type=StatsType.OFPST_PORT,
289
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
290
        return MultipartRequest(
291
            multipart_type=MultipartType.OFPMP_PORT_STATS,
292
            body=v0x04.PortStatsRequest())
293
294
    @classmethod
295
    def listen(cls, switch, ports_stats):
296
        """Receive port stats."""
297
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
298
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
299
                    ' rx_errors %s, tx_errors %s'
300
301
        for port_stat in ports_stats:
302
            cls._update_controller_interface(switch, port_stat)
303
            cls.rrd.update((switch.id, port_stat.port_no.value),
304
                           rx_bytes=port_stat.rx_bytes.value,
305
                           tx_bytes=port_stat.tx_bytes.value,
306
                           rx_dropped=port_stat.rx_dropped.value,
307
                           tx_dropped=port_stat.tx_dropped.value,
308
                           rx_errors=port_stat.rx_errors.value,
309
                           tx_errors=port_stat.tx_errors.value)
310
311
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
312
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
313
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
314
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
315
316
    @staticmethod
317
    def _update_controller_interface(switch, port_stats):
318
        port_no = port_stats.port_no.value
319
        iface = switch.get_interface_by_port_no(port_no)
320
        if iface is not None:
321
            if iface.stats is None:
322
                iface.stats = OFCorePortStats()
323
            iface.stats.update(port_stats)
324
325
326
class AggregateStats(Stats):
327
    """Deal with AggregateStats message."""
328
329
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
330
331
    def request(self, conn):
332
        """Ask for flow stats."""
333
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
334
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
335
        self._send_event(req, conn)
336
        log.debug('Aggregate Stats request for switch %s sent.',
337
                  conn.switch.dpid)
338
339
    @classmethod
340
    def listen(cls, switch, aggregate_stats):
341
        """Receive flow stats."""
342
        debug_msg = 'Received aggregate stats from switch {}:' \
343
                    ' packet_count {}, byte_count {}, flow_count {}'
344
345
        for aggregate in aggregate_stats:
346
            # need to choose the _id to aggregate_stats
347
            # this class isn't used yet.
348
            cls.rrd.update((switch.id,),
349
                           packet_count=aggregate.packet_count.value,
350
                           byte_count=aggregate.byte_count.value,
351
                           flow_count=aggregate.flow_count.value)
352
353
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
354
                      aggregate.byte_count.value, aggregate.flow_count.value)
355
356
357
class FlowStats(Stats):
358
    """Deal with FlowStats message."""
359
360
    rrd = RRD('flows', ('packet_count', 'byte_count'))
361
362
    def request(self, conn):
363
        """Ask for flow stats."""
364
        request = self._get_versioned_request(conn.protocol.version)
365
        self._send_event(request, conn)
366
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
367
368
    @staticmethod
369
    def _get_versioned_request(of_version):
370
        if of_version == 0x01:
371
            return StatsRequest(
372
                body_type=StatsType.OFPST_FLOW,
373
                body=v0x01.FlowStatsRequest())
374
        return MultipartRequest(
375
            multipart_type=MultipartType.OFPMP_FLOW,
376
            body=v0x04.FlowStatsRequest())
377
378
    def listen(self, switch, flows_stats):
379
        """Receive flow stats."""
380
        flow_class = FlowFactory.get_class(switch)
381
        for flow_stat in flows_stats:
382
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
383
384
            # Update controller's flow
385
            controller_flow = switch.get_flow_by_id(flow.id)
386
            if controller_flow:
387
                controller_flow.stats = flow.stats
388
389
            # Save packet_count using kytos/kronos
390
            namespace = (f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
391
                         '.packet_count')
392
            content = {'namespace': namespace,
393
                       'value': flow.stats.packet_count,
394
                       'callback': self._save_event_callback}
395
396
            event = KytosEvent(name='kytos.kronos.save', content=content)
397
            self._app_buffer.put(event)
398
399
            # Save byte_count using kytos/kronos
400
            namespace = (f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
401
                         '.byte_count')
402
            content = {'namespace': namespace,
403
                       'value': flow.stats.byte_count,
404
                       'callback': self._save_event_callback}
405
406
            event = KytosEvent(name='kytos.kronos.save', content=content)
407
            self._app_buffer.put(event)
408