Test Failed
Pull Request — master (#25)
by Vinicius
03:58
created

build.main.Main.table_stats_by_dpid_table_id()   B

Complexity

Conditions 6

Size

Total Lines 17
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 35.2661

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 17
rs 8.6666
c 0
b 0
f 0
ccs 1
cts 15
cp 0.0667
cc 6
nop 3
crap 35.2661
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from collections import defaultdict
9 1
10 1
from kytos.core import KytosNApp, log, rest
11
from kytos.core.events import KytosEvent
12
from kytos.core.helpers import alisten_to, listen_to
13
from kytos.core.rest_api import HTTPException, JSONResponse, Request
14 1
15
16
# pylint: disable=too-many-public-methods
17
class Main(KytosNApp):
18
    """Main class of amlight/kytos_stats NApp.
19 1
    This class is the entry point for this napp.
20
    """
21
22
    def setup(self):
23
        """Replace the '__init__' method for the KytosNApp subclass.
24
        The setup method is automatically called by the controller when your
25 1
        application is loaded.
26 1
        So, if you have any setup routine, insert it here.
27 1
        """
28
        log.info('Starting Kytos/Amlight flow manager')
29 1
        self.flows_stats_dict = {}
30
        self.tables_stats_dict = {}
31
        # port stats data by dpid by port_no
32
        self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict)
33
34
    def execute(self):
35
        """This method is executed right after the setup method execution.
36 1
        You can also use this method in loop mode if you add to the above setup
37
        method a line like the following example:
38
            self.execute_as_loop(30)  # 30-second interval.
39
        """
40
41 1
    def shutdown(self):
42
        """This method is executed when your napp is unloaded.
43 1
        If you have some cleanup procedure, insert it here.
44
        """
45 1
46
    def flow_from_id(self, flow_id):
47
        """Flow from given flow_id."""
48 1
        return self.flows_stats_dict.get(flow_id)
49 1
50 1
    def flow_stats_by_dpid_flow_id(self, dpids):
51
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
52
        """
53
        flow_stats_by_id = {}
54
        flows_stats_dict_copy = self.flows_stats_dict.copy()
55
        for flow_id, flow in flows_stats_dict_copy.items():
56
            dpid = flow.switch.dpid
57
            if dpid in dpids:
58
                if dpid not in flow_stats_by_id:
59
                    flow_stats_by_id[dpid] = {}
60 1
                info_flow_as_dict = flow.stats.as_dict()
61
                info_flow_as_dict.update({"cookie": flow.cookie})
62 1
                info_flow_as_dict.update({"priority": flow.priority})
63
                info_flow_as_dict.update({"match": flow.match.as_dict()})
64
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
65
        return flow_stats_by_id
66
67
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
68
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
69
        """
70
        table_stats_by_id = {}
71
        tables_stats_dict_copy = self.tables_stats_dict.copy()
72
        for dpid, tables in tables_stats_dict_copy.items():
73
            if dpid not in dpids:
74
                continue
75
            table_stats_by_id[dpid] = {}
76
            if len(table_ids) == 0:
77
                table_ids = list(tables.keys())
78
            for table_id, table in tables.items():
79
                if table_id in table_ids:
80 1
                    table_dict = table.as_dict()
81 1
                    del table_dict['switch']
82
                    table_stats_by_id[dpid][table_id] = table_dict
83
        return table_stats_by_id
84
85 1
    def port_stats_filter(
86 1
        self, f_dpids: list[str], f_ports: list[int]
87 1
    ) -> dict:
88 1
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
89 1
        """
90
        port_stats = {}
91 1
        dpid_keys = (
92 1
            (dpid for dpid in f_dpids if dpid in self.port_stats_dict)
93
            if f_dpids
94
            else self.port_stats_dict.keys()
95
        )
96 1
        for dpid in dpid_keys:
97 1
            port_stats[dpid] = {}
98 1
            port_keys = f_ports
99 1
            if not f_ports:
100 1
                port_keys = self.port_stats_dict[dpid].keys()
101 1
            for port_no in port_keys:
102 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
103
                    port_stats[dpid][port_no] = p_stat
104 1
        return port_stats
105 1
106
    @rest('v1/flow/stats')
107 1
    def flow_stats(self, request: Request) -> JSONResponse:
108 1
        """Return the flows stats by dpid.
109 1
        Return the stats of all flows if dpid is None
110 1
        """
111 1
        dpids = request.query_params.getlist("dpid")
112 1
        if len(dpids) == 0:
113
            dpids = [sw.dpid for sw in self.controller.switches.values()]
114
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
115
        return JSONResponse(flow_stats_by_id)
116 1
117
    @rest('v1/table/stats')
118
    def table_stats(self, request: Request) -> JSONResponse:
119
        """Return the table stats by dpid,
120
        and optionally by table_id.
121 1
        """
122
        dpids = request.query_params.getlist("dpid")
123 1
        if len(dpids) == 0:
124 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
125
        table_ids = request.query_params.getlist("table")
126 1
        table_ids = list(map(int, table_ids))
127 1
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
128 1
        return JSONResponse(table_stats_dpid)
129 1
130 1
    @rest('v1/port/stats')
131 1
    async def port_stats(self, request: Request) -> JSONResponse:
132
        """Return the port stats by dpid, and optionally by port."""
133
        dpids = request.query_params.getlist("dpid")
134
        try:
135 1
            ports = list(map(int, request.query_params.getlist("port")))
136
        except (ValueError, TypeError):
137
            detail = "'port' value is supposed to be an integer"
138
            raise HTTPException(400, detail=detail)
139
        return JSONResponse(self.port_stats_filter(dpids, ports))
140 1
141
    @rest('v1/packet_count/{flow_id}')
142 1
    def packet_count(self, request: Request) -> JSONResponse:
143 1
        """Packet count of an specific flow."""
144
        flow_id = request.path_params["flow_id"]
145 1
        flow = self.flow_from_id(flow_id)
146 1
        if flow is None:
147
            raise HTTPException(404, detail="Flow does not exist")
148
        try:
149
            packet_per_second = \
150
                flow.stats.packet_count / flow.stats.duration_sec
151 1
        except ZeroDivisionError:
152 1
            packet_per_second = 0
153
        packet_stats = {
154 1
            'flow_id': flow_id,
155 1
            'packet_counter': flow.stats.packet_count,
156
            'packet_per_second': packet_per_second
157
            }
158
        return JSONResponse(packet_stats)
159
160 1
    @rest('v1/bytes_count/{flow_id}')
161
    def bytes_count(self, request: Request) -> JSONResponse:
162
        """Bytes count of an specific flow."""
163
        flow_id = request.path_params["flow_id"]
164
        flow = self.flow_from_id(flow_id)
165
        if flow is None:
166 1
            raise HTTPException(404, detail="Flow does not exist")
167
        try:
168
            bits_per_second = \
169 1
                flow.stats.byte_count * 8 / flow.stats.duration_sec
170 1
        except ZeroDivisionError:
171
            bits_per_second = 0
172 1
        bytes_stats = {
173
            'flow_id': flow_id,
174
            'bytes_counter': flow.stats.byte_count,
175
            'bits_per_second': bits_per_second
176
            }
177 1
        return JSONResponse(bytes_stats)
178 1
179
    @rest('v1/packet_count/per_flow/{dpid}')
180 1
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
181 1
        """Per flow packet count."""
182 1
        dpid = request.path_params["dpid"]
183 1
        return self.flows_counters('packet_count',
184 1
                                   dpid,
185
                                   counter='packet_counter',
186
                                   rate='packet_per_second')
187 1
188 1
    @rest('v1/bytes_count/per_flow/{dpid}')
189 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
190 1
        """Per flow bytes count."""
191 1
        dpid = request.path_params["dpid"]
192 1
        return self.flows_counters('byte_count',
193 1
                                   dpid,
194
                                   counter='bytes_counter',
195
                                   rate='bits_per_second')
196 1
197
    def flows_counters(self, field, dpid, counter=None, rate=None,
198 1
                       total=False) -> JSONResponse:
199 1
        """Calculate flows statistics.
200
        The returned statistics are both per flow and for the sum of flows
201
        """
202
203 1
        if total:
204
            count_flows = 0
205 1
        else:
206 1
            count_flows = []
207 1
            if not counter:
208
                counter = field
209 1
            if not rate:
210
                rate = field
211 1
212
        # We don't have statistics persistence yet, so for now this only works
213 1
        # for start and end equals to zero
214 1
        flows = self.flow_stats_by_dpid_flow_id([dpid])
215
        flows = flows.get(dpid)
216
217
        if flows is None:
218 1
            return JSONResponse(count_flows)
219
        for flow_id, stats in flows.items():
220
            count = stats[field]
221
            if total:
222
                count_flows += count
223 1
            else:
224
                try:
225 1
                    per_second = count / stats['duration_sec']
226 1
                except ZeroDivisionError:
227 1
                    per_second = 0
228 1
                if rate.startswith('bits'):
229 1
                    per_second *= 8
230
                count_flows.append({'flow_id': flow_id,
231
                                    counter: count,
232
                                    rate: per_second})
233
        return JSONResponse(count_flows)
234
235
    @listen_to('kytos/of_core.flow_stats.received')
236
    def on_stats_received(self, event):
237
        """Capture flow stats messages for OpenFlow 1.3."""
238
        self.handle_stats_received(event)
239
240
    def handle_stats_received(self, event):
241
        """Handle flow stats messages for OpenFlow 1.3."""
242
        if 'replies_flows' in event.content:
243
            replies_flows = event.content['replies_flows']
244
            self.handle_stats_reply_received(replies_flows)
245
246
    def handle_stats_reply_received(self, replies_flows):
247
        """Update the set of flows stats"""
248
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
249
250
    @listen_to('kytos/of_core.table_stats.received')
251
    def on_table_stats_received(self, event):
252
        """Capture table stats messages for OpenFlow 1.3."""
253
        self.handle_table_stats_received(event)
254
255
    def handle_table_stats_received(self, event):
256
        """Handle table stats messages for OpenFlow 1.3."""
257
        replies_tables = event.content['replies_tables']
258
        self.handle_table_stats_reply_received(replies_tables)
259
260
    def handle_table_stats_reply_received(self, replies_tables):
261
        """Update the set of tables stats"""
262
        for table in replies_tables:
263
            switch_id = table.switch.id
264
            if switch_id not in self.tables_stats_dict:
265
                self.tables_stats_dict[switch_id] = {}
266
            self.tables_stats_dict[switch_id][table.table_id] = table
267
268
    @alisten_to('kytos/of_core.port_stats')
269
    async def on_port_stats(self, event: KytosEvent) -> None:
270
        """Handle port stats messages for OpenFlow 1.3."""
271
        port_stats = event.content.get('port_stats')
272
        switch = event.content.get('switch')
273
        if not port_stats or not switch:
274
            return
275
        for port in port_stats:
276
            self.port_stats_dict[switch.id][port.port_no.value] = {
277
                "port_no": port.port_no.value,
278
                "rx_packets": port.rx_packets.value,
279
                "tx_packets": port.tx_packets.value,
280
                "rx_bytes": port.rx_bytes.value,
281
                "tx_bytes": port.tx_bytes.value,
282
                "rx_dropped": port.rx_dropped.value,
283
                "tx_dropped": port.tx_dropped.value,
284
                "rx_errors": port.rx_errors.value,
285
                "tx_errors": port.tx_errors.value,
286
                "rx_frame_err": port.rx_frame_err.value,
287
                "rx_over_err": port.rx_over_err.value,
288
                "rx_crc_err": port.rx_crc_err.value,
289
                "collisions": port.collisions.value,
290
                "duration_sec": port.duration_sec.value,
291
                "duration_nsec": port.duration_nsec.value,
292
            }
293