Test Failed
Pull Request — master (#25)
by Vinicius
04:34
created

build.main.Main.flow_stats()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
cc 2
nop 2
crap 2
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from kytos.core import KytosNApp, log, rest
9 1
from kytos.core.helpers import listen_to
10 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request
11
12
13
# pylint: disable=too-many-public-methods
14 1
class Main(KytosNApp):
15
    """Main class of amlight/kytos_stats NApp.
16
    This class is the entry point for this napp.
17
    """
18
19 1
    def setup(self):
20
        """Replace the '__init__' method for the KytosNApp subclass.
21
        The setup method is automatically called by the controller when your
22
        application is loaded.
23
        So, if you have any setup routine, insert it here.
24
        """
25 1
        log.info('Starting Kytos/Amlight flow manager')
26 1
        self.flows_stats_dict = {}
27 1
        self.tables_stats_dict = {}
28
        self.port_stats_dict = {}
29 1
30
    def execute(self):
31
        """This method is executed right after the setup method execution.
32
        You can also use this method in loop mode if you add to the above setup
33
        method a line like the following example:
34
            self.execute_as_loop(30)  # 30-second interval.
35
        """
36 1
37
    def shutdown(self):
38
        """This method is executed when your napp is unloaded.
39
        If you have some cleanup procedure, insert it here.
40
        """
41 1
42
    def flow_from_id(self, flow_id):
43 1
        """Flow from given flow_id."""
44
        return self.flows_stats_dict.get(flow_id)
45 1
46
    def flow_stats_by_dpid_flow_id(self, dpids):
47
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
48 1
        """
49 1
        flow_stats_by_id = {}
50 1
        flows_stats_dict_copy = self.flows_stats_dict.copy()
51
        for flow_id, flow in flows_stats_dict_copy.items():
52
            dpid = flow.switch.dpid
53
            if dpid in dpids:
54
                if dpid not in flow_stats_by_id:
55
                    flow_stats_by_id[dpid] = {}
56
                info_flow_as_dict = flow.stats.as_dict()
57
                info_flow_as_dict.update({"cookie": flow.cookie})
58
                info_flow_as_dict.update({"priority": flow.priority})
59
                info_flow_as_dict.update({"match": flow.match.as_dict()})
60 1
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
61
        return flow_stats_by_id
62 1
63
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
64
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
65
        """
66
        table_stats_by_id = {}
67
        tables_stats_dict_copy = self.tables_stats_dict.copy()
68
        for dpid, tables in tables_stats_dict_copy.items():
69
            if dpid not in dpids:
70
                continue
71
            table_stats_by_id[dpid] = {}
72
            if len(table_ids) == 0:
73
                table_ids = list(tables.keys())
74
            for table_id, table in tables.items():
75
                if table_id in table_ids:
76
                    table_dict = table.as_dict()
77
                    del table_dict['switch']
78
                    table_stats_by_id[dpid][table_id] = table_dict
79
        return table_stats_by_id
80 1
81 1
    def port_stats_filter(self, f_dpids: list[str], f_ports: list[int]) -> dict:
82
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
83
        """
84
        port_stats = {}
85 1
        dpid_keys = f_dpids if f_dpids else self.port_stats_dict.keys()
86 1
        for dpid in dpid_keys:
87 1
            port_stats[dpid] = {}
88 1
            port_keys = f_ports
89 1
            if not f_ports:
90
                port_keys = self.port_stats_dict.get(dpid, {}).keys()
91 1
            for port_no in port_keys:
92 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
93
                    port_stats[dpid][port_no] = p_stat
94
        return port_stats
95
96 1
    @rest('v1/flow/stats')
97 1
    def flow_stats(self, request: Request) -> JSONResponse:
98 1
        """Return the flows stats by dpid.
99 1
        Return the stats of all flows if dpid is None
100 1
        """
101 1
        dpids = request.query_params.getlist("dpid")
102 1
        if len(dpids) == 0:
103
            dpids = [sw.dpid for sw in self.controller.switches.values()]
104 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
105 1
        return JSONResponse(flow_stats_by_id)
106
107 1
    @rest('v1/table/stats')
108 1
    def table_stats(self, request: Request) -> JSONResponse:
109 1
        """Return the table stats by dpid,
110 1
        and optionally by table_id.
111 1
        """
112 1
        dpids = request.query_params.getlist("dpid")
113
        if len(dpids) == 0:
114
            dpids = [sw.dpid for sw in self.controller.switches.values()]
115
        table_ids = request.query_params.getlist("table")
116 1
        table_ids = list(map(int, table_ids))
117
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
118
        return JSONResponse(table_stats_dpid)
119
120
    @rest('v1/port/stats')
121 1
    async def port_stats(self, request: Request) -> JSONResponse:
122
        """Return the port stats by dpid, and optionally by port."""
123 1
        dpids = request.query_params.getlist("dpid")
124 1
        ports = list(map(int, request.query_params.getlist("port")))
125
        return JSONResponse(self.port_stats_filter(dpids, ports))
126 1
127 1
    @rest('v1/packet_count/{flow_id}')
128 1
    def packet_count(self, request: Request) -> JSONResponse:
129 1
        """Packet count of an specific flow."""
130 1
        flow_id = request.path_params["flow_id"]
131 1
        flow = self.flow_from_id(flow_id)
132
        if flow is None:
133
            raise HTTPException(404, detail="Flow does not exist")
134
        try:
135 1
            packet_per_second = \
136
                flow.stats.packet_count / flow.stats.duration_sec
137
        except ZeroDivisionError:
138
            packet_per_second = 0
139
        packet_stats = {
140 1
            'flow_id': flow_id,
141
            'packet_counter': flow.stats.packet_count,
142 1
            'packet_per_second': packet_per_second
143 1
            }
144
        return JSONResponse(packet_stats)
145 1
146 1
    @rest('v1/bytes_count/{flow_id}')
147
    def bytes_count(self, request: Request) -> JSONResponse:
148
        """Bytes count of an specific flow."""
149
        flow_id = request.path_params["flow_id"]
150
        flow = self.flow_from_id(flow_id)
151 1
        if flow is None:
152 1
            raise HTTPException(404, detail="Flow does not exist")
153
        try:
154 1
            bits_per_second = \
155 1
                flow.stats.byte_count * 8 / flow.stats.duration_sec
156
        except ZeroDivisionError:
157
            bits_per_second = 0
158
        bytes_stats = {
159
            'flow_id': flow_id,
160 1
            'bytes_counter': flow.stats.byte_count,
161
            'bits_per_second': bits_per_second
162
            }
163
        return JSONResponse(bytes_stats)
164
165
    @rest('v1/packet_count/per_flow/{dpid}')
166 1
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
167
        """Per flow packet count."""
168
        dpid = request.path_params["dpid"]
169 1
        return self.flows_counters('packet_count',
170 1
                                   dpid,
171
                                   counter='packet_counter',
172 1
                                   rate='packet_per_second')
173
174
    @rest('v1/bytes_count/per_flow/{dpid}')
175
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
176
        """Per flow bytes count."""
177 1
        dpid = request.path_params["dpid"]
178 1
        return self.flows_counters('byte_count',
179
                                   dpid,
180 1
                                   counter='bytes_counter',
181 1
                                   rate='bits_per_second')
182 1
183 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
184 1
                       total=False) -> JSONResponse:
185
        """Calculate flows statistics.
186
        The returned statistics are both per flow and for the sum of flows
187 1
        """
188 1
189 1
        if total:
190 1
            count_flows = 0
191 1
        else:
192 1
            count_flows = []
193 1
            if not counter:
194
                counter = field
195
            if not rate:
196 1
                rate = field
197
198 1
        # We don't have statistics persistence yet, so for now this only works
199 1
        # for start and end equals to zero
200
        flows = self.flow_stats_by_dpid_flow_id([dpid])
201
        flows = flows.get(dpid)
202
203 1
        if flows is None:
204
            return JSONResponse(count_flows)
205 1
        for flow_id, stats in flows.items():
206 1
            count = stats[field]
207 1
            if total:
208
                count_flows += count
209 1
            else:
210
                try:
211 1
                    per_second = count / stats['duration_sec']
212
                except ZeroDivisionError:
213 1
                    per_second = 0
214 1
                if rate.startswith('bits'):
215
                    per_second *= 8
216
                count_flows.append({'flow_id': flow_id,
217
                                    counter: count,
218 1
                                    rate: per_second})
219
        return JSONResponse(count_flows)
220
221
    @listen_to('kytos/of_core.flow_stats.received')
222
    def on_stats_received(self, event):
223 1
        """Capture flow stats messages for OpenFlow 1.3."""
224
        self.handle_stats_received(event)
225 1
226 1
    def handle_stats_received(self, event):
227 1
        """Handle flow stats messages for OpenFlow 1.3."""
228 1
        if 'replies_flows' in event.content:
229 1
            replies_flows = event.content['replies_flows']
230
            self.handle_stats_reply_received(replies_flows)
231
232
    def handle_stats_reply_received(self, replies_flows):
233
        """Update the set of flows stats"""
234
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
235
236
    @listen_to('kytos/of_core.table_stats.received')
237
    def on_table_stats_received(self, event):
238
        """Capture table stats messages for OpenFlow 1.3."""
239
        self.handle_table_stats_received(event)
240
241
    def handle_table_stats_received(self, event):
242
        """Handle table stats messages for OpenFlow 1.3."""
243
        replies_tables = event.content['replies_tables']
244
        self.handle_table_stats_reply_received(replies_tables)
245
246
    def handle_table_stats_reply_received(self, replies_tables):
247
        """Update the set of tables stats"""
248
        for table in replies_tables:
249
            switch_id = table.switch.id
250
            if switch_id not in self.tables_stats_dict:
251
                self.tables_stats_dict[switch_id] = {}
252
            self.tables_stats_dict[switch_id][table.table_id] = table
253
254
    @listen_to('kytos/of_core.port_stats')
255
    def on_port_stats(self, event):
256
        """Capture port stats messages for OpenFlow 1.3."""
257
        self.handle_port_stats(event)
258
259
    def handle_port_stats(self, event):
260
        """Handle port stats messages for OpenFlow 1.3."""
261
        port_stats = event.content.get('port_stats')
262
        switch = event.content.get('switch')
263
        if not port_stats or not switch:
264
            return
265
        for port in port_stats:
266
            self.port_stats_dict.setdefault(switch.id, {})
267
            self.port_stats_dict[switch.id][port.port_no.value] = {
268
                "port_no": port.port_no.value,
269
                "rx_packets": port.rx_packets.value,
270
                "tx_packets": port.tx_packets.value,
271
                "rx_bytes": port.rx_bytes.value,
272
                "tx_bytes": port.tx_bytes.value,
273
                "rx_dropped": port.rx_dropped.value,
274
                "tx_dropped": port.tx_dropped.value,
275
                "rx_errors": port.rx_errors.value,
276
                "tx_errors": port.tx_errors.value,
277
                "rx_frame_err": port.rx_frame_err.value,
278
                "rx_over_err": port.rx_over_err.value,
279
                "rx_crc_err": port.rx_crc_err.value,
280
                "collisions": port.collisions.value,
281
                "duration_sec": port.duration_sec.value,
282
                "duration_nsec": port.duration_nsec.value,
283
            }
284