Passed
Push — master ( 0db5e5...061016 )
by Vinicius
02:49 queued 14s
created

build.main.Main.on_stats_received()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 2
cts 3
cp 0.6667
cc 1
nop 2
crap 1.037
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from collections import defaultdict
9 1
from datetime import datetime
10
11 1
from kytos.core import KytosNApp, log, rest
12 1
from kytos.core.events import KytosEvent
13 1
from kytos.core.helpers import alisten_to, listen_to
14 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request
15
16
17
# pylint: disable=too-many-public-methods
18 1
class Main(KytosNApp):
19
    """Main class of amlight/kytos_stats NApp.
20
    This class is the entry point for this napp.
21
    """
22
23 1
    def setup(self):
24
        """Replace the '__init__' method for the KytosNApp subclass.
25
        The setup method is automatically called by the controller when your
26
        application is loaded.
27
        So, if you have any setup routine, insert it here.
28
        """
29 1
        log.info('Starting Kytos/Amlight flow manager')
30 1
        self.flows_stats_dict = {}
31 1
        self.tables_stats_dict = {}
32
        # port stats data by dpid by port_no
33 1
        self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict)
34
35 1
    def execute(self):
36
        """This method is executed right after the setup method execution.
37
        You can also use this method in loop mode if you add to the above setup
38
        method a line like the following example:
39
            self.execute_as_loop(30)  # 30-second interval.
40
        """
41
42 1
    def shutdown(self):
43
        """This method is executed when your napp is unloaded.
44
        If you have some cleanup procedure, insert it here.
45
        """
46
47 1
    def flow_from_id(self, flow_id):
48
        """Flow from given flow_id."""
49 1
        return self.flows_stats_dict.get(flow_id)
50
51 1
    def flow_stats_by_dpid_flow_id(self, dpids):
52
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
53
        """
54 1
        flow_stats_by_id = {}
55 1
        flows_stats_dict_copy = self.flows_stats_dict.copy()
56 1
        for flow_id, flow in flows_stats_dict_copy.items():
57
            dpid = flow.switch.dpid
58
            if dpid in dpids:
59
                if dpid not in flow_stats_by_id:
60
                    flow_stats_by_id[dpid] = {}
61
                info_flow_as_dict = flow.stats.as_dict()
62
                info_flow_as_dict.update({"cookie": flow.cookie})
63
                info_flow_as_dict.update({"priority": flow.priority})
64
                info_flow_as_dict.update({"match": flow.match.as_dict()})
65
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
66 1
        return flow_stats_by_id
67
68 1
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
69
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
70
        """
71
        table_stats_by_id = {}
72
        tables_stats_dict_copy = self.tables_stats_dict.copy()
73
        for dpid, tables in tables_stats_dict_copy.items():
74
            if dpid not in dpids:
75
                continue
76
            table_stats_by_id[dpid] = {}
77
            if len(table_ids) == 0:
78
                table_ids = list(tables.keys())
79
            for table_id, table in tables.items():
80
                if table_id in table_ids:
81
                    table_dict = table.as_dict()
82
                    del table_dict['switch']
83
                    table_stats_by_id[dpid][table_id] = table_dict
84
        return table_stats_by_id
85
86 1
    def port_stats_filter(
87
        self, f_dpids: list[str], f_ports: list[int]
88
    ) -> dict:
89
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
90
        """
91 1
        port_stats = {}
92 1
        dpid_keys = (
93
            (dpid for dpid in f_dpids if dpid in self.port_stats_dict)
94
            if f_dpids
95
            else self.port_stats_dict.keys()
96
        )
97 1
        for dpid in dpid_keys:
98 1
            port_stats[dpid] = {}
99 1
            port_keys = f_ports
100 1
            if not f_ports:
101 1
                port_keys = self.port_stats_dict[dpid].keys()
102 1
            for port_no in port_keys:
103 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
104 1
                    port_stats[dpid][port_no] = p_stat
105 1
        return port_stats
106
107 1
    @rest('v1/flow/stats')
108 1
    def flow_stats(self, request: Request) -> JSONResponse:
109
        """Return the flows stats by dpid.
110
        Return the stats of all flows if dpid is None
111
        """
112 1
        dpids = request.query_params.getlist("dpid")
113 1
        if len(dpids) == 0:
114 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
115 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
116 1
        return JSONResponse(flow_stats_by_id)
117
118 1
    @rest('v1/table/stats')
119 1
    def table_stats(self, request: Request) -> JSONResponse:
120
        """Return the table stats by dpid,
121
        and optionally by table_id.
122
        """
123 1
        dpids = request.query_params.getlist("dpid")
124 1
        if len(dpids) == 0:
125 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
126 1
        table_ids = request.query_params.getlist("table")
127 1
        table_ids = list(map(int, table_ids))
128 1
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
129 1
        return JSONResponse(table_stats_dpid)
130
131 1
    @rest('v1/port/stats')
132 1
    async def port_stats(self, request: Request) -> JSONResponse:
133
        """Return the port stats by dpid, and optionally by port."""
134 1
        dpids = request.query_params.getlist("dpid")
135 1
        try:
136 1
            ports = list(map(int, request.query_params.getlist("port")))
137 1
        except (ValueError, TypeError) as exc:
138 1
            detail = "'port' value is supposed to be an integer"
139 1
            raise HTTPException(400, detail=detail) from exc
140 1
        return JSONResponse(self.port_stats_filter(dpids, ports))
141
142 1
    @rest('v1/packet_count/{flow_id}')
143 1
    def packet_count(self, request: Request) -> JSONResponse:
144
        """Packet count of an specific flow."""
145 1
        flow_id = request.path_params["flow_id"]
146 1
        flow = self.flow_from_id(flow_id)
147 1
        if flow is None:
148 1
            raise HTTPException(404, detail="Flow does not exist")
149 1
        try:
150 1
            packet_per_second = \
151
                flow.stats.packet_count / flow.stats.duration_sec
152
        except ZeroDivisionError:
153
            packet_per_second = 0
154 1
        packet_stats = {
155
            'flow_id': flow_id,
156
            'packet_counter': flow.stats.packet_count,
157
            'packet_per_second': packet_per_second
158
            }
159 1
        return JSONResponse(packet_stats)
160
161 1
    @rest('v1/bytes_count/{flow_id}')
162 1
    def bytes_count(self, request: Request) -> JSONResponse:
163
        """Bytes count of an specific flow."""
164 1
        flow_id = request.path_params["flow_id"]
165 1
        flow = self.flow_from_id(flow_id)
166 1
        if flow is None:
167 1
            raise HTTPException(404, detail="Flow does not exist")
168 1
        try:
169 1
            bits_per_second = \
170
                flow.stats.byte_count * 8 / flow.stats.duration_sec
171
        except ZeroDivisionError:
172
            bits_per_second = 0
173 1
        bytes_stats = {
174
            'flow_id': flow_id,
175
            'bytes_counter': flow.stats.byte_count,
176
            'bits_per_second': bits_per_second
177
            }
178 1
        return JSONResponse(bytes_stats)
179
180 1
    @rest('v1/packet_count/per_flow/{dpid}')
181 1
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
182
        """Per flow packet count."""
183 1
        dpid = request.path_params["dpid"]
184 1
        return self.flows_counters('packet_count',
185
                                   dpid,
186
                                   counter='packet_counter',
187
                                   rate='packet_per_second')
188
189 1
    @rest('v1/bytes_count/per_flow/{dpid}')
190 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
191
        """Per flow bytes count."""
192 1
        dpid = request.path_params["dpid"]
193 1
        return self.flows_counters('byte_count',
194
                                   dpid,
195
                                   counter='bytes_counter',
196
                                   rate='bits_per_second')
197
198 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
199
                       total=False) -> JSONResponse:
200
        """Calculate flows statistics.
201
        The returned statistics are both per flow and for the sum of flows
202
        """
203
204 1
        if total:
205
            count_flows = 0
206
        else:
207 1
            count_flows = []
208 1
            if not counter:
209
                counter = field
210 1
            if not rate:
211
                rate = field
212
213
        # We don't have statistics persistence yet, so for now this only works
214
        # for start and end equals to zero
215 1
        flows = self.flow_stats_by_dpid_flow_id([dpid])
216 1
        flows = flows.get(dpid)
217
218 1
        if flows is None:
219 1
            return JSONResponse(count_flows)
220 1
        for flow_id, stats in flows.items():
221 1
            count = stats[field]
222 1
            if total:
223
                count_flows += count
224
            else:
225 1
                try:
226 1
                    per_second = count / stats['duration_sec']
227 1
                except ZeroDivisionError:
228 1
                    per_second = 0
229 1
                if rate.startswith('bits'):
230 1
                    per_second *= 8
231 1
                count_flows.append({'flow_id': flow_id,
232
                                    counter: count,
233
                                    rate: per_second})
234 1
        return JSONResponse(count_flows)
235
236 1
    @listen_to('kytos/of_core.flow_stats.received')
237 1
    def on_stats_received(self, event):
238
        """Capture flow stats messages for OpenFlow 1.3."""
239
        self.handle_stats_received(event)
240
241 1
    def handle_stats_received(self, event):
242
        """Handle flow stats messages for OpenFlow 1.3."""
243 1
        if 'replies_flows' in event.content:
244 1
            replies_flows = event.content['replies_flows']
245 1
            self.handle_stats_reply_received(replies_flows)
246
247 1
    def handle_stats_reply_received(self, replies_flows):
248
        """Update the set of flows stats"""
249 1
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
250
251 1
    @listen_to('kytos/of_core.table_stats.received')
252 1
    def on_table_stats_received(self, event):
253
        """Capture table stats messages for OpenFlow 1.3."""
254
        self.handle_table_stats_received(event)
255
256 1
    def handle_table_stats_received(self, event):
257
        """Handle table stats messages for OpenFlow 1.3."""
258
        replies_tables = event.content['replies_tables']
259
        self.handle_table_stats_reply_received(replies_tables)
260
261 1
    def handle_table_stats_reply_received(self, replies_tables):
262
        """Update the set of tables stats"""
263 1
        for table in replies_tables:
264 1
            switch_id = table.switch.id
265 1
            if switch_id not in self.tables_stats_dict:
266 1
                self.tables_stats_dict[switch_id] = {}
267 1
            self.tables_stats_dict[switch_id][table.table_id] = table
268
269 1
    @alisten_to('kytos/of_core.port_stats')
270 1
    async def on_port_stats(self, event: KytosEvent) -> None:
271
        """Handle port stats messages for OpenFlow 1.3."""
272 1
        port_stats = event.content.get('port_stats')
273 1
        switch = event.content.get('switch')
274 1
        if not port_stats or not switch:
275 1
            return
276 1
        updated_at = datetime.utcnow()
277 1
        for port in port_stats:
278 1
            self.port_stats_dict[switch.id][port.port_no.value] = {
279
                "port_no": port.port_no.value,
280
                "rx_packets": port.rx_packets.value,
281
                "tx_packets": port.tx_packets.value,
282
                "rx_bytes": port.rx_bytes.value,
283
                "tx_bytes": port.tx_bytes.value,
284
                "rx_dropped": port.rx_dropped.value,
285
                "tx_dropped": port.tx_dropped.value,
286
                "rx_errors": port.rx_errors.value,
287
                "tx_errors": port.tx_errors.value,
288
                "rx_frame_err": port.rx_frame_err.value,
289
                "rx_over_err": port.rx_over_err.value,
290
                "rx_crc_err": port.rx_crc_err.value,
291
                "collisions": port.collisions.value,
292
                "duration_sec": port.duration_sec.value,
293
                "duration_nsec": port.duration_nsec.value,
294
                "updated_at": updated_at,
295
            }
296