Passed
Pull Request — master (#42)
by Jose
02:30
created

build.stats.FlowStats.request()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4 1
from pathlib import Path
5
6 1
import pyof.v0x01.controller2switch.common as v0x01
7 1
import rrdtool
8
# pylint: disable=C0411,C0412
9 1
from pyof.v0x01.common.phy_port import Port
10 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
13 1
from pyof.v0x04.controller2switch.common import MultipartType
14 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16 1
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18 1
from napps.kytos.of_core.flow import FlowFactory
19 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20 1
from napps.kytos.of_stats import settings
21
22
23 1
class Stats(metaclass=ABCMeta):
24
    """Abstract class for Statistics implementation."""
25
26 1
    rrd = None
27
28 1
    def __init__(self, msg_out_buffer, msg_app_buffer):
29
        """Store a reference to the controller's buffers.
30
31
        Args:
32
            msg_out_buffer: Where to send events.
33
            msg_app_buffer: Where to send events to other NApps.
34
35
        """
36
        self._buffer = msg_out_buffer
37
        self._app_buffer = msg_app_buffer
38
39 1
    @abstractmethod
40
    def request(self, conn):
41
        """Request statistics."""
42
43 1
    @abstractmethod
44
    def listen(self, switch, stats):
45
        """Listen statistic replies."""
46
47 1
    def _send_event(self, req, conn):
48
        event = KytosEvent(
49
            name='kytos/of_stats.messages.out.ofpt_stats_request',
50
            content={'message': req, 'destination': conn})
51
        self._buffer.put(event)
52
53 1
    @classmethod
54
    def _save_event_callback(cls, _event, data, error):
55
        """Execute the callback to handle with kronos event to save data."""
56
        if error:
57
            log.error(f'Can\'t save stats in kytos/kronos: {error}')
58
        log.debug(data)
59
60
61 1
class RRD:
62
    """Round-robin database for keeping stats.
63
64
    It store statistics every :data:`STATS_INTERVAL`.
65
    """
66
67 1
    def __init__(self, app_folder, data_sources):
68
        """Specify a folder to store RRDs.
69
70
        Args:
71
            app_folder (str): Parent folder for dpids folders.
72
            data_sources (iterable): Data source names (e.g. tx_bytes,
73
                rx_bytes).
74
75
        """
76 1
        self._app = app_folder
77 1
        self._ds = data_sources
78
79 1
    def update(self, index, tstamp=None, **ds_values):
80
        """Add a row to rrd file of *dpid* and *_id*.
81
82
        Args:
83
            dpid (str): Switch dpid.
84
            index (list of str): Index for the RRD database. Examples:
85
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
86
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
87
88
        Create rrd if necessary.
89
90
        """
91 1
        if tstamp is None:
92
            tstamp = 'N'
93 1
        rrd = self.get_or_create_rrd(index)
94 1
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
95 1
        with settings.RRD_LOCK:
96 1
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
97
98 1
    def get_rrd(self, index):
99
        """Return path of the RRD file for *dpid* with *basename*.
100
101
        If rrd doesn't exist, it is *not* created.
102
103
        Args:
104
            index (iterable of str): Index for the RRD database. Examples:
105
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
106
107
        Returns:
108
            str: Absolute RRD path.
109
110
        See Also:
111
            :meth:`get_or_create_rrd`
112
113
        """
114 1
        path = settings.DIR / self._app
115 1
        folders, basename = index[:-1], index[-1]
116 1
        for folder in folders:
117 1
            path = path / folder
118 1
        path = path / '{}.rrd'.format(basename)
119 1
        return str(path)
120
121 1
    def get_or_create_rrd(self, index, tstamp=None):
122
        """If rrd is not found, create it.
123
124
        Args:
125
            index (list of str): Index for the RRD database. Examples:
126
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
127
            tstamp (str, int): Value for start argument of RRD creation.
128
129
        """
130 1
        if tstamp is None:
131 1
            tstamp = 'N'
132
133 1
        rrd = self.get_rrd(index)
134 1
        if not Path(rrd).exists():
135
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
136
            parent = Path(rrd).parent
137
            if not parent.exists():
138
                # We may have concurrency problems creating a folder
139
                parent.mkdir(parents=True, exist_ok=True)
140
            self.create_rrd(rrd, tstamp)
141 1
        return rrd
142
143 1
    def create_rrd(self, rrd, tstamp=None):
144
        """Create an RRD file.
145
146
        Args:
147
            rrd (str): Path of RRD file to be created.
148
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
149
                Defaults to now.
150
151
        """
152 1
        def get_counter(ds_value):
153
            """Return a DS for rrd creation."""
154 1
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
155
                                                   settings.MIN, settings.MAX)
156
157 1
        if tstamp is None:
158
            tstamp = 'N'
159 1
        options = [rrd, '--start', str(tstamp), '--step',
160
                   str(settings.STATS_INTERVAL)]
161 1
        options.extend([get_counter(ds) for ds in self._ds])
162 1
        options.extend(self._get_archives())
163 1
        with settings.RRD_LOCK:
164 1
            rrdtool.create(*options)
165
166
    # pylint: disable=R0914
167 1
    def fetch(self, index, start=None, end=None, n_points=None):
168
        """Fetch average values from rrd.
169
170
        Args:
171
            index (list of str): Index for the RRD database. Examples:
172
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
173
            start (str, int): Unix timestamp in seconds for the first stats.
174
                Defaults to be old enough to have the latest n_points
175
                available (now - n_points * settings.STATS_INTERVAL).
176
            end (str, int): Unix timestamp in seconds for the last stats.
177
                Defaults to current time.
178
            n_points (int): Number of points to return. May return more if
179
                there is no matching resolution in the RRD file, or less if
180
                there is no records for all the time range.
181
                Defaults to as many points as possible.
182
183
        Returns:
184
            A tuple with:
185
186
            1. Iterator over timestamps
187
            2. Column (DS) names
188
            3. List of rows as tuples
189
190
        """
191 1
        rrd = self.get_rrd(index)
192 1
        if not Path(rrd).exists():
193 1
            msg = 'RRD for app {} and index {} not found'.format(self._app,
194
                                                                 index)
195 1
            raise FileNotFoundError(msg)
196
197
        # Use integers to calculate resolution
198 1
        start, end = self._calc_start_end(start, end, n_points, rrd)
199
200
        # Find the best matching resolution for returning n_points.
201 1
        res_args = []
202 1
        if n_points is not None and isinstance(start, int) \
203
                and isinstance(end, int):
204
            resolution = (end - start) // n_points
205
            if resolution > 0:
206
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
207
208 1
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
209 1
        args.extend(res_args)
210 1
        with settings.RRD_LOCK:
211 1
            tstamps, cols, rows = rrdtool.fetch(*args)
212 1
        start, stop, step = tstamps
213
        # rrdtool range is different from Python's.
214 1
        return range(start + step, stop + 1, step), cols, rows
215
216 1
    @staticmethod
217
    def _calc_start_end(start, end, n_points, rrd):
218
        """Calculate start and end values for fetch command."""
219
        # Use integers to calculate resolution
220 1
        if end is None:
221
            end = int(time.time())
222 1
        if start is None:  # Latest n_points
223
            start = end - n_points * settings.STATS_INTERVAL
224 1
        elif start == 'first':  # Usually empty because 'first' is too old
225
            with settings.RRD_LOCK:
226
                start = rrdtool.first(rrd)
227
228
        # For RRDtool to include start and end timestamps.
229 1
        if isinstance(start, int):
230 1
            start -= 1
231 1
        if isinstance(end, int):
232 1
            end -= 1
233
234 1
        return start, end
235
236 1
    def fetch_latest(self, index):
237
        """Fetch only the value for now.
238
239
        Return zero values if there are no values recorded.
240
        """
241 1
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
242 1
        try:
243 1
            tstamps, cols, rows = self.fetch(index, start, end='now')
244 1
        except FileNotFoundError:
245
            # No RRD for port, so it will return zero values
246 1
            return {}
247
        # Last rows may have future timestamp and be empty
248
        latest = None
249
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
250
        # Search backwards for non-null values
251
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
252
            if row[0] is not None and tstamp > min_tstamp:
253
                latest = row
254
        # If no values are found, add zeros.
255
        if not latest:
256
            latest = [0] * len(cols)
257
        return dict(zip(cols, latest))
258
259 1
    @classmethod
260
    def _get_archives(cls):
261
        """Averaged for all Data Sources."""
262 1
        averages = []
263
        # One month stats for the following periods:
264 1
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
265
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
266 1
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
267
                                                          settings.PERIOD))
268
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
269 1
        return averages
270
271
272 1
class PortStats(Stats):
273
    """Deal with PortStats messages."""
274
275 1
    def request(self, conn):
276
        """Ask for port stats."""
277
        request = self._get_versioned_request(conn.protocol.version)
278
        self._send_event(request, conn)
279
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
280
281 1
    @staticmethod
282
    def _get_versioned_request(of_version):
283
        if of_version == 0x01:
284
            return StatsRequest(
285
                body_type=StatsType.OFPST_PORT,
286
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
287
        return MultipartRequest(
288
            multipart_type=MultipartType.OFPMP_PORT_STATS,
289
            body=v0x04.PortStatsRequest())
290
291 1
    def listen(self, switch, ports_stats):
292
        """Receive port stats."""
293
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
294
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
295
                    ' rx_errors %s, tx_errors %s'
296
297
        for port_stat in ports_stats:
298
            self._update_controller_interface(switch, port_stat)
299
300
            statistics_to_send = {'rx_bytes': port_stat.rx_bytes.value,
301
                                  'tx_bytes': port_stat.tx_bytes.value,
302
                                  'rx_dropped': port_stat.rx_dropped.value,
303
                                  'tx_dropped': port_stat.tx_dropped.value,
304
                                  'rx_errors': port_stat.rx_errors.value,
305
                                  'tx_errors': port_stat.tx_errors.value}
306
307
            port_no = port_stat.port_no.value
308
309
            namespace = f'kytos.kronos.{switch.id}.port_no.{port_no}'
310
            content = {'namespace': namespace,
311
                       'value': statistics_to_send,
312
                       'callback': self._save_event_callback,
313
                       'timestamp': time.time()}
314
            event = KytosEvent(name='kytos.kronos.save', content=content)
315
            self._app_buffer.put(event)
316
317
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
318
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
319
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
320
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
321
322 1
    @staticmethod
323
    def _update_controller_interface(switch, port_stats):
324
        port_no = port_stats.port_no.value
325
        iface = switch.get_interface_by_port_no(port_no)
326
        if iface is not None:
327
            if iface.stats is None:
328
                iface.stats = OFCorePortStats()
329
            iface.stats.update(port_stats)
330
331
332 1
class AggregateStats(Stats):
333
    """Deal with AggregateStats message."""
334
335 1
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
336
337 1
    def request(self, conn):
338
        """Ask for flow stats."""
339
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
340
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
341
        self._send_event(req, conn)
342
        log.debug('Aggregate Stats request for switch %s sent.',
343
                  conn.switch.dpid)
344
345 1
    def listen(self, switch, aggregate_stats):
346
        """Receive flow stats."""
347
        debug_msg = 'Received aggregate stats from switch {}:' \
348
                    ' packet_count {}, byte_count {}, flow_count {}'
349
350
        for aggregate in aggregate_stats:
351
            # need to choose the _id to aggregate_stats
352
            # this class isn't used yet.
353
354
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
355
                      aggregate.byte_count.value, aggregate.flow_count.value)
356
357
            # Save aggregate stats using kytos/kronos
358
            namespace = f'kytos.kronos.aggregated_stats.{switch.id}'
359
            stats_to_send = {'aggregate_id': aggregate.id,
360
                             'packet_count': aggregate.packet_count.value,
361
                             'byte_count': aggregate.byte_count.value,
362
                             'flow_count': aggregate.flow_count.value}
363
364
            content = {'namespace': namespace,
365
                       'value': stats_to_send,
366
                       'callback': self._save_event_callback,
367
                       'timestamp': time.time()}
368
369
            event = KytosEvent(name='kytos.kronos.save', content=content)
370
            self._app_buffer.put(event)
371
372
373 1
class FlowStats(Stats):
374
    """Deal with FlowStats message."""
375
376 1
    def request(self, conn):
377
        """Ask for flow stats."""
378
        request = self._get_versioned_request(conn.protocol.version)
379
        self._send_event(request, conn)
380
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
381
382 1
    @staticmethod
383
    def _get_versioned_request(of_version):
384
        if of_version == 0x01:
385
            return StatsRequest(
386
                body_type=StatsType.OFPST_FLOW,
387
                body=v0x01.FlowStatsRequest())
388
        return MultipartRequest(
389
            multipart_type=MultipartType.OFPMP_FLOW,
390
            body=v0x04.FlowStatsRequest())
391
392 1
    def listen(self, switch, flows_stats):
393
        """Receive flow stats."""
394
        flow_class = FlowFactory.get_class(switch)
395
        for flow_stat in flows_stats:
396
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
397
398
            # Update controller's flow
399
            controller_flow = switch.get_flow_by_id(flow.id)
400
            if controller_flow:
401
                controller_flow.stats = flow.stats
402
403
            # Save packet_count using kytos/kronos
404
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
405
            content = {'namespace': namespace,
406
                       'value': {'packet_count': flow.stats.packet_count},
407
                       'callback': self._save_event_callback,
408
                       'timestamp': time.time()}
409
410
            event = KytosEvent(name='kytos.kronos.save', content=content)
411
            self._app_buffer.put(event)
412
413
            # Save byte_count using kytos/kronos
414
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
415
            content = {'namespace': namespace,
416
                       'value': {'byte_count': flow.stats.byte_count},
417
                       'callback': self._save_event_callback,
418
                       'timestamp': time.time()}
419
420
            event = KytosEvent(name='kytos.kronos.save', content=content)
421
            self._app_buffer.put(event)
422