1 | """Module with Classes to handle statistics.""" |
||
2 | import time |
||
3 | from abc import ABCMeta, abstractmethod |
||
4 | |||
5 | import pyof.v0x01.controller2switch.common as v0x01 |
||
6 | # pylint: disable=C0411,C0412 |
||
7 | from pyof.v0x01.common.phy_port import Port |
||
8 | from pyof.v0x01.controller2switch.common import AggregateStatsRequest |
||
9 | from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType |
||
10 | from pyof.v0x04.controller2switch import multipart_request as v0x04 |
||
11 | from pyof.v0x04.controller2switch.common import MultipartType |
||
12 | from pyof.v0x04.controller2switch.multipart_request import MultipartRequest |
||
13 | |||
14 | from kytos.core import KytosEvent, log |
||
15 | # v0x01 and v0x04 PortStats are version independent |
||
16 | from napps.kytos.of_core.flow import FlowFactory |
||
17 | from napps.kytos.of_core.flow import PortStats as OFCorePortStats |
||
18 | |||
19 | |||
20 | class Stats(metaclass=ABCMeta): |
||
21 | """Abstract class for Statistics implementation.""" |
||
22 | |||
23 | def __init__(self, msg_out_buffer, msg_app_buffer): |
||
24 | """Store a reference to the controller's buffers. |
||
25 | |||
26 | Args: |
||
27 | msg_out_buffer: Where to send events. |
||
28 | msg_app_buffer: Where to send events to other NApps. |
||
29 | |||
30 | """ |
||
31 | self._buffer = msg_out_buffer |
||
32 | self._app_buffer = msg_app_buffer |
||
33 | |||
34 | @abstractmethod |
||
35 | def request(self, conn): |
||
36 | """Request statistics.""" |
||
37 | |||
38 | @abstractmethod |
||
39 | def listen(self, switch, stats): |
||
40 | """Listen statistic replies.""" |
||
41 | |||
42 | def _send_event(self, req, conn): |
||
43 | event = KytosEvent( |
||
44 | name='kytos/of_stats.messages.out.ofpt_stats_request', |
||
45 | content={'message': req, 'destination': conn}) |
||
46 | self._buffer.put(event) |
||
47 | |||
48 | @classmethod |
||
49 | def _save_event_callback(cls, _event, data, error): |
||
50 | """Execute the callback to handle with kronos event to save data.""" |
||
51 | if error: |
||
52 | log.error(f'Can\'t save stats in kytos/kronos: {error}') |
||
53 | log.debug(data) |
||
54 | |||
55 | |||
56 | class PortStats(Stats): |
||
57 | """Deal with PortStats messages.""" |
||
58 | |||
59 | def request(self, conn): |
||
60 | """Ask for port stats.""" |
||
61 | request = self._get_versioned_request(conn.protocol.version) |
||
62 | self._send_event(request, conn) |
||
63 | log.debug('PortStats request for switch %s sent.', conn.switch.id) |
||
64 | |||
65 | @staticmethod |
||
66 | def _get_versioned_request(of_version): |
||
67 | if of_version == 0x01: |
||
68 | return StatsRequest( |
||
69 | body_type=StatsType.OFPST_PORT, |
||
70 | body=v0x01.PortStatsRequest(Port.OFPP_NONE)) # All ports |
||
71 | return MultipartRequest( |
||
72 | multipart_type=MultipartType.OFPMP_PORT_STATS, |
||
73 | body=v0x04.PortStatsRequest()) |
||
74 | |||
75 | def listen(self, switch, ports_stats): |
||
76 | """Receive port stats.""" |
||
77 | debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \ |
||
78 | ' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \ |
||
79 | ' rx_errors %s, tx_errors %s' |
||
80 | |||
81 | for port_stat in ports_stats: |
||
82 | self._update_controller_interface(switch, port_stat) |
||
83 | |||
84 | statistics_to_send = {'rx_bytes': port_stat.rx_bytes.value, |
||
85 | 'tx_bytes': port_stat.tx_bytes.value, |
||
86 | 'rx_dropped': port_stat.rx_dropped.value, |
||
87 | 'tx_dropped': port_stat.tx_dropped.value, |
||
88 | 'rx_errors': port_stat.rx_errors.value, |
||
89 | 'tx_errors': port_stat.tx_errors.value} |
||
90 | |||
91 | port_no = port_stat.port_no.value |
||
92 | |||
93 | namespace = f'kytos.kronos.{switch.id}.port_no.{port_no}' |
||
94 | content = {'namespace': namespace, |
||
95 | 'value': statistics_to_send, |
||
96 | 'callback': self._save_event_callback, |
||
97 | 'timestamp': time.time()} |
||
98 | event = KytosEvent(name='kytos.kronos.save', content=content) |
||
99 | self._app_buffer.put(event) |
||
100 | |||
101 | log.debug(debug_msg, port_stat.port_no.value, switch.id, |
||
102 | port_stat.rx_bytes.value, port_stat.tx_bytes.value, |
||
103 | port_stat.rx_dropped.value, port_stat.tx_dropped.value, |
||
104 | port_stat.rx_errors.value, port_stat.tx_errors.value) |
||
105 | |||
106 | @staticmethod |
||
107 | def _update_controller_interface(switch, port_stats): |
||
108 | port_no = port_stats.port_no.value |
||
109 | iface = switch.get_interface_by_port_no(port_no) |
||
110 | if iface is not None: |
||
111 | if iface.stats is None: |
||
112 | iface.stats = OFCorePortStats() |
||
113 | iface.stats.update(port_stats) |
||
114 | |||
115 | |||
116 | class AggregateStats(Stats): |
||
117 | """Deal with AggregateStats message.""" |
||
118 | |||
119 | def request(self, conn): |
||
120 | """Ask for flow stats.""" |
||
121 | body = AggregateStatsRequest() # Port.OFPP_NONE and All Tables |
||
122 | req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body) |
||
123 | self._send_event(req, conn) |
||
124 | log.debug('Aggregate Stats request for switch %s sent.', |
||
125 | conn.switch.dpid) |
||
126 | |||
127 | def listen(self, switch, aggregate_stats): |
||
128 | """Receive flow stats.""" |
||
129 | debug_msg = 'Received aggregate stats from switch {}:' \ |
||
130 | ' packet_count {}, byte_count {}, flow_count {}' |
||
131 | |||
132 | for aggregate in aggregate_stats: |
||
133 | # need to choose the _id to aggregate_stats |
||
134 | # this class isn't used yet. |
||
135 | |||
136 | log.debug(debug_msg, switch.id, aggregate.packet_count.value, |
||
137 | aggregate.byte_count.value, aggregate.flow_count.value) |
||
138 | |||
139 | # Save aggregate stats using kytos/kronos |
||
140 | namespace = f'kytos.kronos.aggregated_stats.{switch.id}' |
||
141 | stats_to_send = {'aggregate_id': aggregate.id, |
||
142 | 'packet_count': aggregate.packet_count.value, |
||
143 | 'byte_count': aggregate.byte_count.value, |
||
144 | 'flow_count': aggregate.flow_count.value} |
||
145 | |||
146 | content = {'namespace': namespace, |
||
147 | 'value': stats_to_send, |
||
148 | 'callback': self._save_event_callback, |
||
149 | 'timestamp': time.time()} |
||
150 | |||
151 | event = KytosEvent(name='kytos.kronos.save', content=content) |
||
152 | self._app_buffer.put(event) |
||
153 | |||
154 | |||
155 | class FlowStats(Stats): |
||
156 | """Deal with FlowStats message.""" |
||
157 | |||
158 | def request(self, conn): |
||
159 | """Ask for flow stats.""" |
||
160 | request = self._get_versioned_request(conn.protocol.version) |
||
161 | self._send_event(request, conn) |
||
162 | log.debug('FlowStats request for switch %s sent.', conn.switch.id) |
||
163 | |||
164 | @staticmethod |
||
165 | def _get_versioned_request(of_version): |
||
166 | if of_version == 0x01: |
||
167 | return StatsRequest( |
||
168 | body_type=StatsType.OFPST_FLOW, |
||
169 | body=v0x01.FlowStatsRequest()) |
||
170 | return MultipartRequest( |
||
171 | multipart_type=MultipartType.OFPMP_FLOW, |
||
172 | body=v0x04.FlowStatsRequest()) |
||
173 | |||
174 | def listen(self, switch, flows_stats): |
||
175 | """Receive flow stats.""" |
||
176 | flow_class = FlowFactory.get_class(switch) |
||
177 | for flow_stat in flows_stats: |
||
178 | flow = flow_class.from_of_flow_stats(flow_stat, switch) |
||
179 | |||
180 | # Update controller's flow |
||
181 | controller_flow = switch.get_flow_by_id(flow.id) |
||
182 | if controller_flow: |
||
183 | controller_flow.stats = flow.stats |
||
184 | |||
185 | # Save packet_count using kytos/kronos |
||
186 | namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}' |
||
187 | content = {'namespace': namespace, |
||
188 | 'value': {'packet_count': flow.stats.packet_count}, |
||
189 | 'callback': self._save_event_callback, |
||
190 | 'timestamp': time.time()} |
||
191 | |||
192 | event = KytosEvent(name='kytos.kronos.save', content=content) |
||
193 | self._app_buffer.put(event) |
||
194 | |||
195 | # Save byte_count using kytos/kronos |
||
196 | namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}' |
||
197 | content = {'namespace': namespace, |
||
198 | 'value': {'byte_count': flow.stats.byte_count}, |
||
199 | 'callback': self._save_event_callback, |
||
200 | 'timestamp': time.time()} |
||
201 | |||
202 | event = KytosEvent(name='kytos.kronos.save', content=content) |
||
203 | self._app_buffer.put(event) |
||
204 |