1 | """Main module of amlight/kytos_stats Kytos Network Application. |
||
2 | |||
3 | This NApp does operations with flows not covered by Kytos itself. |
||
4 | """ |
||
5 | # pylint: disable=too-many-return-statements,too-many-instance-attributes |
||
6 | # pylint: disable=too-many-arguments,too-many-branches,too-many-statements |
||
7 | |||
8 | 1 | from collections import defaultdict |
|
9 | 1 | from datetime import datetime |
|
10 | |||
11 | 1 | from kytos.core import KytosNApp, log, rest |
|
12 | 1 | from kytos.core.events import KytosEvent |
|
13 | 1 | from kytos.core.helpers import alisten_to, listen_to |
|
14 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request |
|
15 | |||
16 | |||
17 | # pylint: disable=too-many-public-methods |
||
18 | 1 | class Main(KytosNApp): |
|
19 | """Main class of amlight/kytos_stats NApp. |
||
20 | This class is the entry point for this napp. |
||
21 | """ |
||
22 | |||
23 | 1 | def setup(self): |
|
24 | """Replace the '__init__' method for the KytosNApp subclass. |
||
25 | The setup method is automatically called by the controller when your |
||
26 | application is loaded. |
||
27 | So, if you have any setup routine, insert it here. |
||
28 | """ |
||
29 | 1 | log.info('Starting Kytos/Amlight flow manager') |
|
30 | 1 | self.flows_stats_dict = {} |
|
31 | 1 | self.tables_stats_dict = {} |
|
32 | # port stats data by dpid by port_no |
||
33 | 1 | self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict) |
|
34 | |||
35 | 1 | def execute(self): |
|
36 | """This method is executed right after the setup method execution. |
||
37 | You can also use this method in loop mode if you add to the above setup |
||
38 | method a line like the following example: |
||
39 | self.execute_as_loop(30) # 30-second interval. |
||
40 | """ |
||
41 | |||
42 | 1 | def shutdown(self): |
|
43 | """This method is executed when your napp is unloaded. |
||
44 | If you have some cleanup procedure, insert it here. |
||
45 | """ |
||
46 | |||
47 | 1 | def flow_from_id(self, flow_id): |
|
48 | """Flow from given flow_id.""" |
||
49 | 1 | return self.flows_stats_dict.get(flow_id) |
|
50 | |||
51 | 1 | def flow_stats_by_dpid_flow_id(self, dpids): |
|
52 | """ Auxiliar funcion for v1/flow/stats endpoint implementation. |
||
53 | """ |
||
54 | 1 | flow_stats_by_id = {} |
|
55 | 1 | flows_stats_dict_copy = self.flows_stats_dict.copy() |
|
56 | 1 | for flow_id, flow in flows_stats_dict_copy.items(): |
|
57 | dpid = flow.switch.dpid |
||
58 | if dpid in dpids: |
||
59 | if dpid not in flow_stats_by_id: |
||
60 | flow_stats_by_id[dpid] = {} |
||
61 | info_flow_as_dict = flow.stats.as_dict() |
||
62 | info_flow_as_dict.update({"cookie": flow.cookie}) |
||
63 | info_flow_as_dict.update({"priority": flow.priority}) |
||
64 | info_flow_as_dict.update({"match": flow.match.as_dict()}) |
||
65 | flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict}) |
||
66 | 1 | return flow_stats_by_id |
|
67 | |||
68 | 1 | def table_stats_by_dpid_table_id(self, dpids, table_ids): |
|
69 | """ Auxiliar funcion for v1/table/stats endpoint implementation. |
||
70 | """ |
||
71 | table_stats_by_id = {} |
||
72 | tables_stats_dict_copy = self.tables_stats_dict.copy() |
||
73 | for dpid, tables in tables_stats_dict_copy.items(): |
||
74 | if dpid not in dpids: |
||
75 | continue |
||
76 | table_stats_by_id[dpid] = {} |
||
77 | if len(table_ids) == 0: |
||
78 | table_ids = list(tables.keys()) |
||
79 | for table_id, table in tables.items(): |
||
80 | if table_id in table_ids: |
||
81 | table_dict = table.as_dict() |
||
82 | del table_dict['switch'] |
||
83 | table_stats_by_id[dpid][table_id] = table_dict |
||
84 | return table_stats_by_id |
||
85 | |||
86 | 1 | def port_stats_filter( |
|
87 | self, f_dpids: list[str], f_ports: list[int] |
||
88 | ) -> dict: |
||
89 | """ Auxiliar funcion for v1/port/stats endpoint implementation. |
||
90 | """ |
||
91 | 1 | port_stats = {} |
|
92 | 1 | dpid_keys = ( |
|
93 | (dpid for dpid in f_dpids if dpid in self.port_stats_dict) |
||
94 | if f_dpids |
||
95 | else self.port_stats_dict.keys() |
||
96 | ) |
||
97 | 1 | for dpid in dpid_keys: |
|
98 | 1 | port_stats[dpid] = {} |
|
99 | 1 | port_keys = f_ports |
|
100 | 1 | if not f_ports: |
|
101 | 1 | port_keys = self.port_stats_dict[dpid].keys() |
|
102 | 1 | for port_no in port_keys: |
|
103 | 1 | if p_stat := self.port_stats_dict[dpid].get(port_no): |
|
104 | 1 | port_stats[dpid][port_no] = p_stat |
|
105 | 1 | return port_stats |
|
106 | |||
107 | 1 | @rest('v1/flow/stats') |
|
108 | 1 | def flow_stats(self, request: Request) -> JSONResponse: |
|
109 | """Return the flows stats by dpid. |
||
110 | Return the stats of all flows if dpid is None |
||
111 | """ |
||
112 | 1 | dpids = request.query_params.getlist("dpid") |
|
113 | 1 | if len(dpids) == 0: |
|
114 | 1 | dpids = [sw.dpid for sw in self.controller.switches.values()] |
|
115 | 1 | flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids) |
|
116 | 1 | return JSONResponse(flow_stats_by_id) |
|
117 | |||
118 | 1 | @rest('v1/table/stats') |
|
119 | 1 | def table_stats(self, request: Request) -> JSONResponse: |
|
120 | """Return the table stats by dpid, |
||
121 | and optionally by table_id. |
||
122 | """ |
||
123 | 1 | dpids = request.query_params.getlist("dpid") |
|
124 | 1 | if len(dpids) == 0: |
|
125 | 1 | dpids = [sw.dpid for sw in self.controller.switches.values()] |
|
126 | 1 | table_ids = request.query_params.getlist("table") |
|
127 | 1 | table_ids = list(map(int, table_ids)) |
|
128 | 1 | table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids) |
|
129 | 1 | return JSONResponse(table_stats_dpid) |
|
130 | |||
131 | 1 | @rest('v1/port/stats') |
|
132 | 1 | async def port_stats(self, request: Request) -> JSONResponse: |
|
133 | """Return the port stats by dpid, and optionally by port.""" |
||
134 | 1 | dpids = request.query_params.getlist("dpid") |
|
135 | 1 | try: |
|
136 | 1 | ports = list(map(int, request.query_params.getlist("port"))) |
|
137 | 1 | except (ValueError, TypeError) as exc: |
|
138 | 1 | detail = "'port' value is supposed to be an integer" |
|
139 | 1 | raise HTTPException(400, detail=detail) from exc |
|
140 | 1 | return JSONResponse(self.port_stats_filter(dpids, ports)) |
|
141 | |||
142 | 1 | @rest('v1/packet_count/{flow_id}') |
|
143 | 1 | def packet_count(self, request: Request) -> JSONResponse: |
|
144 | """Packet count of an specific flow.""" |
||
145 | 1 | flow_id = request.path_params["flow_id"] |
|
146 | 1 | flow = self.flow_from_id(flow_id) |
|
147 | 1 | if flow is None: |
|
148 | 1 | raise HTTPException(404, detail="Flow does not exist") |
|
149 | 1 | try: |
|
150 | 1 | packet_per_second = \ |
|
151 | flow.stats.packet_count / flow.stats.duration_sec |
||
152 | except ZeroDivisionError: |
||
153 | packet_per_second = 0 |
||
154 | 1 | packet_stats = { |
|
155 | 'flow_id': flow_id, |
||
156 | 'packet_counter': flow.stats.packet_count, |
||
157 | 'packet_per_second': packet_per_second |
||
158 | } |
||
159 | 1 | return JSONResponse(packet_stats) |
|
160 | |||
161 | 1 | @rest('v1/bytes_count/{flow_id}') |
|
162 | 1 | def bytes_count(self, request: Request) -> JSONResponse: |
|
163 | """Bytes count of an specific flow.""" |
||
164 | 1 | flow_id = request.path_params["flow_id"] |
|
165 | 1 | flow = self.flow_from_id(flow_id) |
|
166 | 1 | if flow is None: |
|
167 | 1 | raise HTTPException(404, detail="Flow does not exist") |
|
168 | 1 | try: |
|
169 | 1 | bits_per_second = \ |
|
170 | flow.stats.byte_count * 8 / flow.stats.duration_sec |
||
171 | except ZeroDivisionError: |
||
172 | bits_per_second = 0 |
||
173 | 1 | bytes_stats = { |
|
174 | 'flow_id': flow_id, |
||
175 | 'bytes_counter': flow.stats.byte_count, |
||
176 | 'bits_per_second': bits_per_second |
||
177 | } |
||
178 | 1 | return JSONResponse(bytes_stats) |
|
179 | |||
180 | 1 | @rest('v1/packet_count/per_flow/{dpid}') |
|
181 | 1 | def packet_count_per_flow(self, request: Request) -> JSONResponse: |
|
182 | """Per flow packet count.""" |
||
183 | 1 | dpid = request.path_params["dpid"] |
|
184 | 1 | return self.flows_counters('packet_count', |
|
185 | dpid, |
||
186 | counter='packet_counter', |
||
187 | rate='packet_per_second') |
||
188 | |||
189 | 1 | @rest('v1/bytes_count/per_flow/{dpid}') |
|
190 | 1 | def bytes_count_per_flow(self, request: Request) -> JSONResponse: |
|
191 | """Per flow bytes count.""" |
||
192 | 1 | dpid = request.path_params["dpid"] |
|
193 | 1 | return self.flows_counters('byte_count', |
|
194 | dpid, |
||
195 | counter='bytes_counter', |
||
196 | rate='bits_per_second') |
||
197 | |||
198 | 1 | def flows_counters(self, field, dpid, counter=None, rate=None, |
|
199 | total=False) -> JSONResponse: |
||
200 | """Calculate flows statistics. |
||
201 | The returned statistics are both per flow and for the sum of flows |
||
202 | """ |
||
203 | |||
204 | 1 | if total: |
|
205 | count_flows = 0 |
||
206 | else: |
||
207 | 1 | count_flows = [] |
|
208 | 1 | if not counter: |
|
209 | counter = field |
||
210 | 1 | if not rate: |
|
211 | rate = field |
||
212 | |||
213 | # We don't have statistics persistence yet, so for now this only works |
||
214 | # for start and end equals to zero |
||
215 | 1 | flows = self.flow_stats_by_dpid_flow_id([dpid]) |
|
216 | 1 | flows = flows.get(dpid) |
|
217 | |||
218 | 1 | if flows is None: |
|
219 | 1 | return JSONResponse(count_flows) |
|
220 | 1 | for flow_id, stats in flows.items(): |
|
221 | 1 | count = stats[field] |
|
222 | 1 | if total: |
|
223 | count_flows += count |
||
224 | else: |
||
225 | 1 | try: |
|
226 | 1 | per_second = count / stats['duration_sec'] |
|
227 | 1 | except ZeroDivisionError: |
|
228 | 1 | per_second = 0 |
|
229 | 1 | if rate.startswith('bits'): |
|
230 | 1 | per_second *= 8 |
|
231 | 1 | count_flows.append({'flow_id': flow_id, |
|
232 | counter: count, |
||
233 | rate: per_second}) |
||
234 | 1 | return JSONResponse(count_flows) |
|
235 | |||
236 | 1 | @listen_to('kytos/of_core.flow_stats.received') |
|
237 | 1 | def on_stats_received(self, event): |
|
238 | """Capture flow stats messages for OpenFlow 1.3.""" |
||
239 | self.handle_stats_received(event) |
||
240 | |||
241 | 1 | def handle_stats_received(self, event): |
|
242 | """Handle flow stats messages for OpenFlow 1.3.""" |
||
243 | 1 | if 'replies_flows' in event.content: |
|
244 | 1 | replies_flows = event.content['replies_flows'] |
|
245 | 1 | self.handle_stats_reply_received(replies_flows) |
|
246 | |||
247 | 1 | def handle_stats_reply_received(self, replies_flows): |
|
248 | """Update the set of flows stats""" |
||
249 | 1 | self.flows_stats_dict.update({flow.id: flow for flow in replies_flows}) |
|
250 | |||
251 | 1 | @listen_to('kytos/of_core.table_stats.received') |
|
252 | 1 | def on_table_stats_received(self, event): |
|
253 | """Capture table stats messages for OpenFlow 1.3.""" |
||
254 | self.handle_table_stats_received(event) |
||
255 | |||
256 | 1 | def handle_table_stats_received(self, event): |
|
257 | """Handle table stats messages for OpenFlow 1.3.""" |
||
258 | replies_tables = event.content['replies_tables'] |
||
259 | self.handle_table_stats_reply_received(replies_tables) |
||
260 | |||
261 | 1 | def handle_table_stats_reply_received(self, replies_tables): |
|
262 | """Update the set of tables stats""" |
||
263 | 1 | for table in replies_tables: |
|
264 | 1 | switch_id = table.switch.id |
|
265 | 1 | if switch_id not in self.tables_stats_dict: |
|
266 | 1 | self.tables_stats_dict[switch_id] = {} |
|
267 | 1 | self.tables_stats_dict[switch_id][table.table_id] = table |
|
268 | |||
269 | 1 | @alisten_to('kytos/of_core.port_stats') |
|
270 | 1 | async def on_port_stats(self, event: KytosEvent) -> None: |
|
271 | """Handle port stats messages for OpenFlow 1.3.""" |
||
272 | 1 | port_stats = event.content.get('port_stats') |
|
273 | 1 | switch = event.content.get('switch') |
|
274 | 1 | if not port_stats or not switch: |
|
275 | 1 | return |
|
276 | 1 | updated_at = datetime.utcnow() |
|
277 | 1 | for port in port_stats: |
|
278 | 1 | self.port_stats_dict[switch.id][port.port_no.value] = { |
|
279 | "port_no": port.port_no.value, |
||
280 | "rx_packets": port.rx_packets.value, |
||
281 | "tx_packets": port.tx_packets.value, |
||
282 | "rx_bytes": port.rx_bytes.value, |
||
283 | "tx_bytes": port.tx_bytes.value, |
||
284 | "rx_dropped": port.rx_dropped.value, |
||
285 | "tx_dropped": port.tx_dropped.value, |
||
286 | "rx_errors": port.rx_errors.value, |
||
287 | "tx_errors": port.tx_errors.value, |
||
288 | "rx_frame_err": port.rx_frame_err.value, |
||
289 | "rx_over_err": port.rx_over_err.value, |
||
290 | "rx_crc_err": port.rx_crc_err.value, |
||
291 | "collisions": port.collisions.value, |
||
292 | "duration_sec": port.duration_sec.value, |
||
293 | "duration_nsec": port.duration_nsec.value, |
||
294 | "updated_at": updated_at, |
||
295 | } |
||
296 |