Passed
Pull Request — master (#42)
by Jose
03:27
created

build.stats.RRD.__init__()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 11
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4 1
from pathlib import Path
5
6 1
import pyof.v0x01.controller2switch.common as v0x01
7 1
import rrdtool
8
# pylint: disable=C0411,C0412
9 1
from pyof.v0x01.common.phy_port import Port
10 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
11 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
12 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
13 1
from pyof.v0x04.controller2switch.common import MultipartType
14 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
15
16 1
from kytos.core import KytosEvent, log
17
# v0x01 and v0x04 PortStats are version independent
18 1
from napps.kytos.of_core.flow import FlowFactory
19 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
20 1
from napps.kytos.of_stats import settings
21
22
23 1
class Stats(metaclass=ABCMeta):
24
    """Abstract class for Statistics implementation."""
25
26 1
    rrd = None
27
28 1
    def __init__(self, msg_out_buffer, msg_app_buffer):
29
        """Store a reference to the controller's msg_out buffer.
30
31
        Args:
32
            msg_out_buffer: Where to send events.
33
34
        """
35
        self._buffer = msg_out_buffer
36
        self._app_buffer = msg_app_buffer
37
38 1
    @abstractmethod
39
    def request(self, conn):
40
        """Request statistics."""
41
42 1
    @abstractmethod
43
    def listen(self, switch, stats):
44
        """Listen statistic replies."""
45
46 1
    def _send_event(self, req, conn):
47
        event = KytosEvent(
48
            name='kytos/of_stats.messages.out.ofpt_stats_request',
49
            content={'message': req, 'destination': conn})
50
        self._buffer.put(event)
51
52 1
    @classmethod
53
    def _save_event_callback(cls, _event, data, error):
54
        """Execute the callback to handle with kronos event to save data."""
55
        if error:
56
            log.error(f'Can\'t save stats in kytos/kronos: {error}')
57
58
        log.info(data)
59
60
61 1
class RRD:
62
    """Round-robin database for keeping stats.
63
64
    It store statistics every :data:`STATS_INTERVAL`.
65
    """
66
67 1
    def __init__(self, app_folder, data_sources):
68
        """Specify a folder to store RRDs.
69
70
        Args:
71
            app_folder (str): Parent folder for dpids folders.
72
            data_sources (iterable): Data source names (e.g. tx_bytes,
73
                rx_bytes).
74
75
        """
76 1
        self._app = app_folder
77 1
        self._ds = data_sources
78
79 1
    def update(self, index, tstamp=None, **ds_values):
80
        """Add a row to rrd file of *dpid* and *_id*.
81
82
        Args:
83
            dpid (str): Switch dpid.
84
            index (list of str): Index for the RRD database. Examples:
85
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
86
            tstamp (str, int): Unix timestamp in seconds. Defaults to now.
87
88
        Create rrd if necessary.
89
90
        """
91 1
        if tstamp is None:
92
            tstamp = 'N'
93 1
        rrd = self.get_or_create_rrd(index)
94 1
        data = ':'.join(str(ds_values[ds]) for ds in self._ds)
95 1
        with settings.RRD_LOCK:
96 1
            rrdtool.update(rrd, '{}:{}'.format(tstamp, data))
97
98 1
    def get_rrd(self, index):
99
        """Return path of the RRD file for *dpid* with *basename*.
100
101
        If rrd doesn't exist, it is *not* created.
102
103
        Args:
104
            index (iterable of str): Index for the RRD database. Examples:
105
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
106
107
        Returns:
108
            str: Absolute RRD path.
109
110
        See Also:
111
            :meth:`get_or_create_rrd`
112
113
        """
114 1
        path = settings.DIR / self._app
115 1
        folders, basename = index[:-1], index[-1]
116 1
        for folder in folders:
117 1
            path = path / folder
118 1
        path = path / '{}.rrd'.format(basename)
119 1
        return str(path)
120
121 1
    def get_or_create_rrd(self, index, tstamp=None):
122
        """If rrd is not found, create it.
123
124
        Args:
125
            index (list of str): Index for the RRD database. Examples:
126
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
127
            tstamp (str, int): Value for start argument of RRD creation.
128
129
        """
130 1
        if tstamp is None:
131 1
            tstamp = 'N'
132
133 1
        rrd = self.get_rrd(index)
134 1
        if not Path(rrd).exists():
135
            log.debug('Creating rrd for app %s, index %s.', self._app, index)
136
            parent = Path(rrd).parent
137
            if not parent.exists():
138
                # We may have concurrency problems creating a folder
139
                parent.mkdir(parents=True, exist_ok=True)
140
            self.create_rrd(rrd, tstamp)
141 1
        return rrd
142
143 1
    def create_rrd(self, rrd, tstamp=None):
144
        """Create an RRD file.
145
146
        Args:
147
            rrd (str): Path of RRD file to be created.
148
            tstamp (str, int): Unix timestamp in seconds for RRD creation.
149
                Defaults to now.
150
151
        """
152 1
        def get_counter(ds_value):
153
            """Return a DS for rrd creation."""
154 1
            return 'DS:{}:COUNTER:{}:{}:{}'.format(ds_value, settings.TIMEOUT,
155
                                                   settings.MIN, settings.MAX)
156
157 1
        if tstamp is None:
158
            tstamp = 'N'
159 1
        options = [rrd, '--start', str(tstamp), '--step',
160
                   str(settings.STATS_INTERVAL)]
161 1
        options.extend([get_counter(ds) for ds in self._ds])
162 1
        options.extend(self._get_archives())
163 1
        with settings.RRD_LOCK:
164 1
            rrdtool.create(*options)
165
166
    # pylint: disable=R0914
167 1
    def fetch(self, index, start=None, end=None, n_points=None):
168
        """Fetch average values from rrd.
169
170
        Args:
171
            index (list of str): Index for the RRD database. Examples:
172
                [dpid], [dpid, port_no], [dpid, table id, flow hash].
173
            start (str, int): Unix timestamp in seconds for the first stats.
174
                Defaults to be old enough to have the latest n_points
175
                available (now - n_points * settings.STATS_INTERVAL).
176
            end (str, int): Unix timestamp in seconds for the last stats.
177
                Defaults to current time.
178
            n_points (int): Number of points to return. May return more if
179
                there is no matching resolution in the RRD file, or less if
180
                there is no records for all the time range.
181
                Defaults to as many points as possible.
182
183
        Returns:
184
            A tuple with:
185
186
            1. Iterator over timestamps
187
            2. Column (DS) names
188
            3. List of rows as tuples
189
190
        """
191 1
        rrd = self.get_rrd(index)
192 1
        if not Path(rrd).exists():
193 1
            msg = 'RRD for app {} and index {} not found'.format(self._app,
194
                                                                 index)
195 1
            raise FileNotFoundError(msg)
196
197
        # Use integers to calculate resolution
198 1
        start, end = self._calc_start_end(start, end, n_points, rrd)
199
200
        # Find the best matching resolution for returning n_points.
201 1
        res_args = []
202 1
        if n_points is not None and isinstance(start, int) \
203
                and isinstance(end, int):
204
            resolution = (end - start) // n_points
205
            if resolution > 0:
206
                res_args.extend(['-a', '-r', '{}s'.format(resolution)])
207
208 1
        args = [rrd, 'AVERAGE', '--start', str(start), '--end', str(end)]
209 1
        args.extend(res_args)
210 1
        with settings.RRD_LOCK:
211 1
            tstamps, cols, rows = rrdtool.fetch(*args)
212 1
        start, stop, step = tstamps
213
        # rrdtool range is different from Python's.
214 1
        return range(start + step, stop + 1, step), cols, rows
215
216 1
    @staticmethod
217
    def _calc_start_end(start, end, n_points, rrd):
218
        """Calculate start and end values for fetch command."""
219
        # Use integers to calculate resolution
220 1
        if end is None:
221
            end = int(time.time())
222 1
        if start is None:  # Latest n_points
223
            start = end - n_points * settings.STATS_INTERVAL
224 1
        elif start == 'first':  # Usually empty because 'first' is too old
225
            with settings.RRD_LOCK:
226
                start = rrdtool.first(rrd)
227
228
        # For RRDtool to include start and end timestamps.
229 1
        if isinstance(start, int):
230 1
            start -= 1
231 1
        if isinstance(end, int):
232 1
            end -= 1
233
234 1
        return start, end
235
236 1
    def fetch_latest(self, index):
237
        """Fetch only the value for now.
238
239
        Return zero values if there are no values recorded.
240
        """
241 1
        start = 'end-{}s'.format(settings.STATS_INTERVAL * 3)  # two rows
242 1
        try:
243 1
            tstamps, cols, rows = self.fetch(index, start, end='now')
244 1
        except FileNotFoundError:
245
            # No RRD for port, so it will return zero values
246 1
            return {}
247
        # Last rows may have future timestamp and be empty
248
        latest = None
249
        min_tstamp = int(time.time()) - settings.STATS_INTERVAL * 2
250
        # Search backwards for non-null values
251
        for tstamp, row in zip(tstamps[::-1], rows[::-1]):
252
            if row[0] is not None and tstamp > min_tstamp:
253
                latest = row
254
        # If no values are found, add zeros.
255
        if not latest:
256
            latest = [0] * len(cols)
257
        return dict(zip(cols, latest))
258
259 1
    @classmethod
260
    def _get_archives(cls):
261
        """Averaged for all Data Sources."""
262 1
        averages = []
263
        # One month stats for the following periods:
264 1
        for steps in ('1m', '2m', '4m', '8m', '15m', '30m', '1h', '2h', '4h'
265
                      '8h', '12h', '1d', '2d', '3d', '6d', '10d', '15d'):
266 1
            averages.append('RRA:AVERAGE:{}:{}:{}'.format(settings.XFF, steps,
267
                                                          settings.PERIOD))
268
        # averages = ['RRA:AVERAGE:0:1:1d']  # More samples for testing
269 1
        return averages
270
271
272 1
class PortStats(Stats):
273
    """Deal with PortStats messages."""
274
275 1
    def request(self, conn):
276
        """Ask for port stats."""
277
        request = self._get_versioned_request(conn.protocol.version)
278
        self._send_event(request, conn)
279
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
280
281 1
    @staticmethod
282
    def _get_versioned_request(of_version):
283
        if of_version == 0x01:
284
            return StatsRequest(
285
                body_type=StatsType.OFPST_PORT,
286
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
287
        return MultipartRequest(
288
            multipart_type=MultipartType.OFPMP_PORT_STATS,
289
            body=v0x04.PortStatsRequest())
290
291 1
    def listen(self, switch, ports_stats):
292
        """Receive port stats."""
293
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
294
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
295
                    ' rx_errors %s, tx_errors %s'
296
297
        for port_stat in ports_stats:
298
            self._update_controller_interface(switch, port_stat)
299
300
            statistics_to_send = [('rx_bytes', port_stat.rx_bytes.value),
301
                                  ('tx_bytes', port_stat.tx_bytes.value),
302
                                  ('rx_dropped', port_stat.rx_dropped.value),
303
                                  ('tx_dropped', port_stat.tx_dropped.value),
304
                                  ('rx_errors', port_stat.rx_errors.value),
305
                                  ('tx_errors', port_stat.tx_errors.value)]
306
307
            port_no = port_stat.port_no.value
308
309
            for statistic in statistics_to_send:
310
                namespace = (f'kytos.kronos.{switch.id}.port_no.{port_no}'
311
                             '.{statistic[0]}')
312
                content = {'namespace': namespace,
313
                           'value': statistic[1],
314
                           'callback': self._save_event_callback,
315
                           'timestamp': time.time()}
316
                event = KytosEvent(name='kytos.kronos.save', content=content)
317
                self._app_buffer.put(event)
318
319
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
320
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
321
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
322
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
323
324 1
    @staticmethod
325
    def _update_controller_interface(switch, port_stats):
326
        port_no = port_stats.port_no.value
327
        iface = switch.get_interface_by_port_no(port_no)
328
        if iface is not None:
329
            if iface.stats is None:
330
                iface.stats = OFCorePortStats()
331
            iface.stats.update(port_stats)
332
333
334 1
class AggregateStats(Stats):
335
    """Deal with AggregateStats message."""
336
337 1
    _rrd = RRD('aggr', ('packet_count', 'byte_count', 'flow_count'))
338
339 1
    def request(self, conn):
340
        """Ask for flow stats."""
341
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
342
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
343
        self._send_event(req, conn)
344
        log.debug('Aggregate Stats request for switch %s sent.',
345
                  conn.switch.dpid)
346
347 1
    @classmethod
348
    def listen(cls, switch, aggregate_stats):
349
        """Receive flow stats."""
350
        debug_msg = 'Received aggregate stats from switch {}:' \
351
                    ' packet_count {}, byte_count {}, flow_count {}'
352
353
        for aggregate in aggregate_stats:
354
            # need to choose the _id to aggregate_stats
355
            # this class isn't used yet.
356
            cls.rrd.update((switch.id,),
357
                           packet_count=aggregate.packet_count.value,
358
                           byte_count=aggregate.byte_count.value,
359
                           flow_count=aggregate.flow_count.value)
360
361
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
362
                      aggregate.byte_count.value, aggregate.flow_count.value)
363
364
365 1
class FlowStats(Stats):
366
    """Deal with FlowStats message."""
367
368 1
    def request(self, conn):
369
        """Ask for flow stats."""
370
        request = self._get_versioned_request(conn.protocol.version)
371
        self._send_event(request, conn)
372
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
373
374 1
    @staticmethod
375
    def _get_versioned_request(of_version):
376
        if of_version == 0x01:
377
            return StatsRequest(
378
                body_type=StatsType.OFPST_FLOW,
379
                body=v0x01.FlowStatsRequest())
380
        return MultipartRequest(
381
            multipart_type=MultipartType.OFPMP_FLOW,
382
            body=v0x04.FlowStatsRequest())
383
384 1
    def listen(self, switch, flows_stats):
385
        """Receive flow stats."""
386
        flow_class = FlowFactory.get_class(switch)
387
        for flow_stat in flows_stats:
388
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
389
390
            # Update controller's flow
391
            controller_flow = switch.get_flow_by_id(flow.id)
392
            if controller_flow:
393
                controller_flow.stats = flow.stats
394
395
            # Save packet_count using kytos/kronos
396
            namespace = (f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
397
                         '.packet_count')
398
            content = {'namespace': namespace,
399
                       'value': flow.stats.packet_count,
400
                       'callback': self._save_event_callback,
401
                       'timestamp': time.time()}
402
403
            event = KytosEvent(name='kytos.kronos.save', content=content)
404
            self._app_buffer.put(event)
405
406
            # Save byte_count using kytos/kronos
407
            namespace = (f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
408
                         '.byte_count')
409
            content = {'namespace': namespace,
410
                       'value': flow.stats.byte_count,
411
                       'callback': self._save_event_callback,
412
                       'timestamp': time.time()}
413
414
            event = KytosEvent(name='kytos.kronos.save', content=content)
415
            self._app_buffer.put(event)
416