Test Failed
Pull Request — master (#25)
by Vinicius
04:08
created

build.main.Main.packet_count()   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.0416

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 18
ccs 10
cts 12
cp 0.8333
rs 9.6
c 0
b 0
f 0
cc 3
nop 2
crap 3.0416
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from collections import defaultdict
9 1
10 1
from kytos.core import KytosNApp, log, rest
11
from kytos.core.helpers import listen_to
12
from kytos.core.rest_api import HTTPException, JSONResponse, Request
13
14 1
15
# pylint: disable=too-many-public-methods
16
class Main(KytosNApp):
17
    """Main class of amlight/kytos_stats NApp.
18
    This class is the entry point for this napp.
19 1
    """
20
21
    def setup(self):
22
        """Replace the '__init__' method for the KytosNApp subclass.
23
        The setup method is automatically called by the controller when your
24
        application is loaded.
25 1
        So, if you have any setup routine, insert it here.
26 1
        """
27 1
        log.info('Starting Kytos/Amlight flow manager')
28
        self.flows_stats_dict = {}
29 1
        self.tables_stats_dict = {}
30
        # port stats data by dpid by port_no
31
        self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict)
32
33
    def execute(self):
34
        """This method is executed right after the setup method execution.
35
        You can also use this method in loop mode if you add to the above setup
36 1
        method a line like the following example:
37
            self.execute_as_loop(30)  # 30-second interval.
38
        """
39
40
    def shutdown(self):
41 1
        """This method is executed when your napp is unloaded.
42
        If you have some cleanup procedure, insert it here.
43 1
        """
44
45 1
    def flow_from_id(self, flow_id):
46
        """Flow from given flow_id."""
47
        return self.flows_stats_dict.get(flow_id)
48 1
49 1
    def flow_stats_by_dpid_flow_id(self, dpids):
50 1
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
51
        """
52
        flow_stats_by_id = {}
53
        flows_stats_dict_copy = self.flows_stats_dict.copy()
54
        for flow_id, flow in flows_stats_dict_copy.items():
55
            dpid = flow.switch.dpid
56
            if dpid in dpids:
57
                if dpid not in flow_stats_by_id:
58
                    flow_stats_by_id[dpid] = {}
59
                info_flow_as_dict = flow.stats.as_dict()
60 1
                info_flow_as_dict.update({"cookie": flow.cookie})
61
                info_flow_as_dict.update({"priority": flow.priority})
62 1
                info_flow_as_dict.update({"match": flow.match.as_dict()})
63
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
64
        return flow_stats_by_id
65
66
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
67
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
68
        """
69
        table_stats_by_id = {}
70
        tables_stats_dict_copy = self.tables_stats_dict.copy()
71
        for dpid, tables in tables_stats_dict_copy.items():
72
            if dpid not in dpids:
73
                continue
74
            table_stats_by_id[dpid] = {}
75
            if len(table_ids) == 0:
76
                table_ids = list(tables.keys())
77
            for table_id, table in tables.items():
78
                if table_id in table_ids:
79
                    table_dict = table.as_dict()
80 1
                    del table_dict['switch']
81 1
                    table_stats_by_id[dpid][table_id] = table_dict
82
        return table_stats_by_id
83
84
    def port_stats_filter(self, f_dpids: list[str], f_ports: list[int]) -> dict:
85 1
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
86 1
        """
87 1
        port_stats = {}
88 1
        dpid_keys = (
89 1
            (dpid for dpid in f_dpids if dpid in self.port_stats_dict)
90
            if f_dpids
91 1
            else self.port_stats_dict.keys()
92 1
        )
93
        for dpid in dpid_keys:
94
            port_stats[dpid] = {}
95
            port_keys = f_ports
96 1
            if not f_ports:
97 1
                port_keys = self.port_stats_dict[dpid].keys()
98 1
            for port_no in port_keys:
99 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
100 1
                    port_stats[dpid][port_no] = p_stat
101 1
        return port_stats
102 1
103
    @rest('v1/flow/stats')
104 1
    def flow_stats(self, request: Request) -> JSONResponse:
105 1
        """Return the flows stats by dpid.
106
        Return the stats of all flows if dpid is None
107 1
        """
108 1
        dpids = request.query_params.getlist("dpid")
109 1
        if len(dpids) == 0:
110 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
111 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
112 1
        return JSONResponse(flow_stats_by_id)
113
114
    @rest('v1/table/stats')
115
    def table_stats(self, request: Request) -> JSONResponse:
116 1
        """Return the table stats by dpid,
117
        and optionally by table_id.
118
        """
119
        dpids = request.query_params.getlist("dpid")
120
        if len(dpids) == 0:
121 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
122
        table_ids = request.query_params.getlist("table")
123 1
        table_ids = list(map(int, table_ids))
124 1
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
125
        return JSONResponse(table_stats_dpid)
126 1
127 1
    @rest('v1/port/stats')
128 1
    async def port_stats(self, request: Request) -> JSONResponse:
129 1
        """Return the port stats by dpid, and optionally by port."""
130 1
        dpids = request.query_params.getlist("dpid")
131 1
        ports = list(map(int, request.query_params.getlist("port")))
132
        return JSONResponse(self.port_stats_filter(dpids, ports))
133
134
    @rest('v1/packet_count/{flow_id}')
135 1
    def packet_count(self, request: Request) -> JSONResponse:
136
        """Packet count of an specific flow."""
137
        flow_id = request.path_params["flow_id"]
138
        flow = self.flow_from_id(flow_id)
139
        if flow is None:
140 1
            raise HTTPException(404, detail="Flow does not exist")
141
        try:
142 1
            packet_per_second = \
143 1
                flow.stats.packet_count / flow.stats.duration_sec
144
        except ZeroDivisionError:
145 1
            packet_per_second = 0
146 1
        packet_stats = {
147
            'flow_id': flow_id,
148
            'packet_counter': flow.stats.packet_count,
149
            'packet_per_second': packet_per_second
150
            }
151 1
        return JSONResponse(packet_stats)
152 1
153
    @rest('v1/bytes_count/{flow_id}')
154 1
    def bytes_count(self, request: Request) -> JSONResponse:
155 1
        """Bytes count of an specific flow."""
156
        flow_id = request.path_params["flow_id"]
157
        flow = self.flow_from_id(flow_id)
158
        if flow is None:
159
            raise HTTPException(404, detail="Flow does not exist")
160 1
        try:
161
            bits_per_second = \
162
                flow.stats.byte_count * 8 / flow.stats.duration_sec
163
        except ZeroDivisionError:
164
            bits_per_second = 0
165
        bytes_stats = {
166 1
            'flow_id': flow_id,
167
            'bytes_counter': flow.stats.byte_count,
168
            'bits_per_second': bits_per_second
169 1
            }
170 1
        return JSONResponse(bytes_stats)
171
172 1
    @rest('v1/packet_count/per_flow/{dpid}')
173
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
174
        """Per flow packet count."""
175
        dpid = request.path_params["dpid"]
176
        return self.flows_counters('packet_count',
177 1
                                   dpid,
178 1
                                   counter='packet_counter',
179
                                   rate='packet_per_second')
180 1
181 1
    @rest('v1/bytes_count/per_flow/{dpid}')
182 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
183 1
        """Per flow bytes count."""
184 1
        dpid = request.path_params["dpid"]
185
        return self.flows_counters('byte_count',
186
                                   dpid,
187 1
                                   counter='bytes_counter',
188 1
                                   rate='bits_per_second')
189 1
190 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
191 1
                       total=False) -> JSONResponse:
192 1
        """Calculate flows statistics.
193 1
        The returned statistics are both per flow and for the sum of flows
194
        """
195
196 1
        if total:
197
            count_flows = 0
198 1
        else:
199 1
            count_flows = []
200
            if not counter:
201
                counter = field
202
            if not rate:
203 1
                rate = field
204
205 1
        # We don't have statistics persistence yet, so for now this only works
206 1
        # for start and end equals to zero
207 1
        flows = self.flow_stats_by_dpid_flow_id([dpid])
208
        flows = flows.get(dpid)
209 1
210
        if flows is None:
211 1
            return JSONResponse(count_flows)
212
        for flow_id, stats in flows.items():
213 1
            count = stats[field]
214 1
            if total:
215
                count_flows += count
216
            else:
217
                try:
218 1
                    per_second = count / stats['duration_sec']
219
                except ZeroDivisionError:
220
                    per_second = 0
221
                if rate.startswith('bits'):
222
                    per_second *= 8
223 1
                count_flows.append({'flow_id': flow_id,
224
                                    counter: count,
225 1
                                    rate: per_second})
226 1
        return JSONResponse(count_flows)
227 1
228 1
    @listen_to('kytos/of_core.flow_stats.received')
229 1
    def on_stats_received(self, event):
230
        """Capture flow stats messages for OpenFlow 1.3."""
231
        self.handle_stats_received(event)
232
233
    def handle_stats_received(self, event):
234
        """Handle flow stats messages for OpenFlow 1.3."""
235
        if 'replies_flows' in event.content:
236
            replies_flows = event.content['replies_flows']
237
            self.handle_stats_reply_received(replies_flows)
238
239
    def handle_stats_reply_received(self, replies_flows):
240
        """Update the set of flows stats"""
241
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
242
243
    @listen_to('kytos/of_core.table_stats.received')
244
    def on_table_stats_received(self, event):
245
        """Capture table stats messages for OpenFlow 1.3."""
246
        self.handle_table_stats_received(event)
247
248
    def handle_table_stats_received(self, event):
249
        """Handle table stats messages for OpenFlow 1.3."""
250
        replies_tables = event.content['replies_tables']
251
        self.handle_table_stats_reply_received(replies_tables)
252
253
    def handle_table_stats_reply_received(self, replies_tables):
254
        """Update the set of tables stats"""
255
        for table in replies_tables:
256
            switch_id = table.switch.id
257
            if switch_id not in self.tables_stats_dict:
258
                self.tables_stats_dict[switch_id] = {}
259
            self.tables_stats_dict[switch_id][table.table_id] = table
260
261
    @listen_to('kytos/of_core.port_stats')
262
    def on_port_stats(self, event):
263
        """Capture port stats messages for OpenFlow 1.3."""
264
        self.handle_port_stats(event)
265
266
    def handle_port_stats(self, event):
267
        """Handle port stats messages for OpenFlow 1.3."""
268
        port_stats = event.content.get('port_stats')
269
        switch = event.content.get('switch')
270
        if not port_stats or not switch:
271
            return
272
        for port in port_stats:
273
            self.port_stats_dict[switch.id][port.port_no.value] = {
274
                "port_no": port.port_no.value,
275
                "rx_packets": port.rx_packets.value,
276
                "tx_packets": port.tx_packets.value,
277
                "rx_bytes": port.rx_bytes.value,
278
                "tx_bytes": port.tx_bytes.value,
279
                "rx_dropped": port.rx_dropped.value,
280
                "tx_dropped": port.tx_dropped.value,
281
                "rx_errors": port.rx_errors.value,
282
                "tx_errors": port.tx_errors.value,
283
                "rx_frame_err": port.rx_frame_err.value,
284
                "rx_over_err": port.rx_over_err.value,
285
                "rx_crc_err": port.rx_crc_err.value,
286
                "collisions": port.collisions.value,
287
                "duration_sec": port.duration_sec.value,
288
                "duration_nsec": port.duration_nsec.value,
289
            }
290