Passed
Pull Request — master (#42)
by Jose
02:25
created

build.stats.RRD.get_or_create_rrd()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 3
dl 0
loc 21
ccs 0
cts 11
cp 0
crap 20
rs 9.85
c 0
b 0
f 0
1
"""Module with Classes to handle statistics."""
2 1
import time
3 1
from abc import ABCMeta, abstractmethod
4
5 1
import pyof.v0x01.controller2switch.common as v0x01
6
# pylint: disable=C0411,C0412
7 1
from pyof.v0x01.common.phy_port import Port
8 1
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
9 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
10 1
from pyof.v0x04.controller2switch import multipart_request as v0x04
11 1
from pyof.v0x04.controller2switch.common import MultipartType
12 1
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
13
14 1
from kytos.core import KytosEvent, log
15
# v0x01 and v0x04 PortStats are version independent
16 1
from napps.kytos.of_core.flow import FlowFactory
17 1
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
18
19
20 1
class Stats(metaclass=ABCMeta):
21
    """Abstract class for Statistics implementation."""
22
23 1
    def __init__(self, msg_out_buffer, msg_app_buffer):
24
        """Store a reference to the controller's buffers.
25
26
        Args:
27
            msg_out_buffer: Where to send events.
28
            msg_app_buffer: Where to send events to other NApps.
29
30
        """
31
        self._buffer = msg_out_buffer
32
        self._app_buffer = msg_app_buffer
33
34 1
    @abstractmethod
35
    def request(self, conn):
36
        """Request statistics."""
37
38 1
    @abstractmethod
39
    def listen(self, switch, stats):
40
        """Listen statistic replies."""
41
42 1
    def _send_event(self, req, conn):
43
        event = KytosEvent(
44
            name='kytos/of_stats.messages.out.ofpt_stats_request',
45
            content={'message': req, 'destination': conn})
46
        self._buffer.put(event)
47
48 1
    @classmethod
49
    def _save_event_callback(cls, _event, data, error):
50
        """Execute the callback to handle with kronos event to save data."""
51
        if error:
52
            log.error(f'Can\'t save stats in kytos/kronos: {error}')
53
        log.debug(data)
54
55
56 1
class PortStats(Stats):
57
    """Deal with PortStats messages."""
58
59 1
    def request(self, conn):
60
        """Ask for port stats."""
61
        request = self._get_versioned_request(conn.protocol.version)
62
        self._send_event(request, conn)
63
        log.debug('PortStats request for switch %s sent.', conn.switch.id)
64
65 1
    @staticmethod
66
    def _get_versioned_request(of_version):
67
        if of_version == 0x01:
68
            return StatsRequest(
69
                body_type=StatsType.OFPST_PORT,
70
                body=v0x01.PortStatsRequest(Port.OFPP_NONE))  # All ports
71
        return MultipartRequest(
72
            multipart_type=MultipartType.OFPMP_PORT_STATS,
73
            body=v0x04.PortStatsRequest())
74
75 1
    def listen(self, switch, ports_stats):
76
        """Receive port stats."""
77
        debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
78
                    ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
79
                    ' rx_errors %s, tx_errors %s'
80
81
        for port_stat in ports_stats:
82
            self._update_controller_interface(switch, port_stat)
83
84
            statistics_to_send = {'rx_bytes': port_stat.rx_bytes.value,
85
                                  'tx_bytes': port_stat.tx_bytes.value,
86
                                  'rx_dropped': port_stat.rx_dropped.value,
87
                                  'tx_dropped': port_stat.tx_dropped.value,
88
                                  'rx_errors': port_stat.rx_errors.value,
89
                                  'tx_errors': port_stat.tx_errors.value}
90
91
            port_no = port_stat.port_no.value
92
93
            namespace = f'kytos.kronos.{switch.id}.port_no.{port_no}'
94
            content = {'namespace': namespace,
95
                       'value': statistics_to_send,
96
                       'callback': self._save_event_callback,
97
                       'timestamp': time.time()}
98
            event = KytosEvent(name='kytos.kronos.save', content=content)
99
            self._app_buffer.put(event)
100
101
            log.debug(debug_msg, port_stat.port_no.value, switch.id,
102
                      port_stat.rx_bytes.value, port_stat.tx_bytes.value,
103
                      port_stat.rx_dropped.value, port_stat.tx_dropped.value,
104
                      port_stat.rx_errors.value, port_stat.tx_errors.value)
105
106 1
    @staticmethod
107
    def _update_controller_interface(switch, port_stats):
108
        port_no = port_stats.port_no.value
109
        iface = switch.get_interface_by_port_no(port_no)
110
        if iface is not None:
111
            if iface.stats is None:
112
                iface.stats = OFCorePortStats()
113
            iface.stats.update(port_stats)
114
115
116 1
class AggregateStats(Stats):
117
    """Deal with AggregateStats message."""
118
119 1
    def request(self, conn):
120
        """Ask for flow stats."""
121
        body = AggregateStatsRequest()  # Port.OFPP_NONE and All Tables
122
        req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
123
        self._send_event(req, conn)
124
        log.debug('Aggregate Stats request for switch %s sent.',
125
                  conn.switch.dpid)
126
127 1
    def listen(self, switch, aggregate_stats):
128
        """Receive flow stats."""
129
        debug_msg = 'Received aggregate stats from switch {}:' \
130
                    ' packet_count {}, byte_count {}, flow_count {}'
131
132
        for aggregate in aggregate_stats:
133
            # need to choose the _id to aggregate_stats
134
            # this class isn't used yet.
135
136
            log.debug(debug_msg, switch.id, aggregate.packet_count.value,
137
                      aggregate.byte_count.value, aggregate.flow_count.value)
138
139
            # Save aggregate stats using kytos/kronos
140
            namespace = f'kytos.kronos.aggregated_stats.{switch.id}'
141
            stats_to_send = {'aggregate_id': aggregate.id,
142
                             'packet_count': aggregate.packet_count.value,
143
                             'byte_count': aggregate.byte_count.value,
144
                             'flow_count': aggregate.flow_count.value}
145
146
            content = {'namespace': namespace,
147
                       'value': stats_to_send,
148
                       'callback': self._save_event_callback,
149
                       'timestamp': time.time()}
150
151
            event = KytosEvent(name='kytos.kronos.save', content=content)
152
            self._app_buffer.put(event)
153
154
155 1
class FlowStats(Stats):
156
    """Deal with FlowStats message."""
157
158 1
    def request(self, conn):
159
        """Ask for flow stats."""
160
        request = self._get_versioned_request(conn.protocol.version)
161
        self._send_event(request, conn)
162
        log.debug('FlowStats request for switch %s sent.', conn.switch.id)
163
164 1
    @staticmethod
165
    def _get_versioned_request(of_version):
166
        if of_version == 0x01:
167
            return StatsRequest(
168
                body_type=StatsType.OFPST_FLOW,
169
                body=v0x01.FlowStatsRequest())
170
        return MultipartRequest(
171
            multipart_type=MultipartType.OFPMP_FLOW,
172
            body=v0x04.FlowStatsRequest())
173
174 1
    def listen(self, switch, flows_stats):
175
        """Receive flow stats."""
176
        flow_class = FlowFactory.get_class(switch)
177
        for flow_stat in flows_stats:
178
            flow = flow_class.from_of_flow_stats(flow_stat, switch)
179
180
            # Update controller's flow
181
            controller_flow = switch.get_flow_by_id(flow.id)
182
            if controller_flow:
183
                controller_flow.stats = flow.stats
184
185
            # Save packet_count using kytos/kronos
186
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
187
            content = {'namespace': namespace,
188
                       'value': {'packet_count': flow.stats.packet_count},
189
                       'callback': self._save_event_callback,
190
                       'timestamp': time.time()}
191
192
            event = KytosEvent(name='kytos.kronos.save', content=content)
193
            self._app_buffer.put(event)
194
195
            # Save byte_count using kytos/kronos
196
            namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
197
            content = {'namespace': namespace,
198
                       'value': {'byte_count': flow.stats.byte_count},
199
                       'callback': self._save_event_callback,
200
                       'timestamp': time.time()}
201
202
            event = KytosEvent(name='kytos.kronos.save', content=content)
203
            self._app_buffer.put(event)
204