Test Failed
Pull Request — master (#25)
by Vinicius
03:49
created

build.main.Main.handle_stats_reply_received()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
cc 1
nop 2
crap 1
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from collections import defaultdict
9 1
10 1
from kytos.core import KytosNApp, log, rest
11
from kytos.core.events import KytosEvent
12
from kytos.core.helpers import listen_to, alisten_to
13
from kytos.core.rest_api import HTTPException, JSONResponse, Request
14 1
15
16
# pylint: disable=too-many-public-methods
17
class Main(KytosNApp):
18
    """Main class of amlight/kytos_stats NApp.
19 1
    This class is the entry point for this napp.
20
    """
21
22
    def setup(self):
23
        """Replace the '__init__' method for the KytosNApp subclass.
24
        The setup method is automatically called by the controller when your
25 1
        application is loaded.
26 1
        So, if you have any setup routine, insert it here.
27 1
        """
28
        log.info('Starting Kytos/Amlight flow manager')
29 1
        self.flows_stats_dict = {}
30
        self.tables_stats_dict = {}
31
        # port stats data by dpid by port_no
32
        self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict)
33
34
    def execute(self):
35
        """This method is executed right after the setup method execution.
36 1
        You can also use this method in loop mode if you add to the above setup
37
        method a line like the following example:
38
            self.execute_as_loop(30)  # 30-second interval.
39
        """
40
41 1
    def shutdown(self):
42
        """This method is executed when your napp is unloaded.
43 1
        If you have some cleanup procedure, insert it here.
44
        """
45 1
46
    def flow_from_id(self, flow_id):
47
        """Flow from given flow_id."""
48 1
        return self.flows_stats_dict.get(flow_id)
49 1
50 1
    def flow_stats_by_dpid_flow_id(self, dpids):
51
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
52
        """
53
        flow_stats_by_id = {}
54
        flows_stats_dict_copy = self.flows_stats_dict.copy()
55
        for flow_id, flow in flows_stats_dict_copy.items():
56
            dpid = flow.switch.dpid
57
            if dpid in dpids:
58
                if dpid not in flow_stats_by_id:
59
                    flow_stats_by_id[dpid] = {}
60 1
                info_flow_as_dict = flow.stats.as_dict()
61
                info_flow_as_dict.update({"cookie": flow.cookie})
62 1
                info_flow_as_dict.update({"priority": flow.priority})
63
                info_flow_as_dict.update({"match": flow.match.as_dict()})
64
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
65
        return flow_stats_by_id
66
67
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
68
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
69
        """
70
        table_stats_by_id = {}
71
        tables_stats_dict_copy = self.tables_stats_dict.copy()
72
        for dpid, tables in tables_stats_dict_copy.items():
73
            if dpid not in dpids:
74
                continue
75
            table_stats_by_id[dpid] = {}
76
            if len(table_ids) == 0:
77
                table_ids = list(tables.keys())
78
            for table_id, table in tables.items():
79
                if table_id in table_ids:
80 1
                    table_dict = table.as_dict()
81 1
                    del table_dict['switch']
82
                    table_stats_by_id[dpid][table_id] = table_dict
83
        return table_stats_by_id
84
85 1
    def port_stats_filter(self, f_dpids: list[str], f_ports: list[int]) -> dict:
86 1
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
87 1
        """
88 1
        port_stats = {}
89 1
        dpid_keys = (
90
            (dpid for dpid in f_dpids if dpid in self.port_stats_dict)
91 1
            if f_dpids
92 1
            else self.port_stats_dict.keys()
93
        )
94
        for dpid in dpid_keys:
95
            port_stats[dpid] = {}
96 1
            port_keys = f_ports
97 1
            if not f_ports:
98 1
                port_keys = self.port_stats_dict[dpid].keys()
99 1
            for port_no in port_keys:
100 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
101 1
                    port_stats[dpid][port_no] = p_stat
102 1
        return port_stats
103
104 1
    @rest('v1/flow/stats')
105 1
    def flow_stats(self, request: Request) -> JSONResponse:
106
        """Return the flows stats by dpid.
107 1
        Return the stats of all flows if dpid is None
108 1
        """
109 1
        dpids = request.query_params.getlist("dpid")
110 1
        if len(dpids) == 0:
111 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
112 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
113
        return JSONResponse(flow_stats_by_id)
114
115
    @rest('v1/table/stats')
116 1
    def table_stats(self, request: Request) -> JSONResponse:
117
        """Return the table stats by dpid,
118
        and optionally by table_id.
119
        """
120
        dpids = request.query_params.getlist("dpid")
121 1
        if len(dpids) == 0:
122
            dpids = [sw.dpid for sw in self.controller.switches.values()]
123 1
        table_ids = request.query_params.getlist("table")
124 1
        table_ids = list(map(int, table_ids))
125
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
126 1
        return JSONResponse(table_stats_dpid)
127 1
128 1
    @rest('v1/port/stats')
129 1
    async def port_stats(self, request: Request) -> JSONResponse:
130 1
        """Return the port stats by dpid, and optionally by port."""
131 1
        dpids = request.query_params.getlist("dpid")
132
        ports = list(map(int, request.query_params.getlist("port")))
133
        return JSONResponse(self.port_stats_filter(dpids, ports))
134
135 1
    @rest('v1/packet_count/{flow_id}')
136
    def packet_count(self, request: Request) -> JSONResponse:
137
        """Packet count of an specific flow."""
138
        flow_id = request.path_params["flow_id"]
139
        flow = self.flow_from_id(flow_id)
140 1
        if flow is None:
141
            raise HTTPException(404, detail="Flow does not exist")
142 1
        try:
143 1
            packet_per_second = \
144
                flow.stats.packet_count / flow.stats.duration_sec
145 1
        except ZeroDivisionError:
146 1
            packet_per_second = 0
147
        packet_stats = {
148
            'flow_id': flow_id,
149
            'packet_counter': flow.stats.packet_count,
150
            'packet_per_second': packet_per_second
151 1
            }
152 1
        return JSONResponse(packet_stats)
153
154 1
    @rest('v1/bytes_count/{flow_id}')
155 1
    def bytes_count(self, request: Request) -> JSONResponse:
156
        """Bytes count of an specific flow."""
157
        flow_id = request.path_params["flow_id"]
158
        flow = self.flow_from_id(flow_id)
159
        if flow is None:
160 1
            raise HTTPException(404, detail="Flow does not exist")
161
        try:
162
            bits_per_second = \
163
                flow.stats.byte_count * 8 / flow.stats.duration_sec
164
        except ZeroDivisionError:
165
            bits_per_second = 0
166 1
        bytes_stats = {
167
            'flow_id': flow_id,
168
            'bytes_counter': flow.stats.byte_count,
169 1
            'bits_per_second': bits_per_second
170 1
            }
171
        return JSONResponse(bytes_stats)
172 1
173
    @rest('v1/packet_count/per_flow/{dpid}')
174
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
175
        """Per flow packet count."""
176
        dpid = request.path_params["dpid"]
177 1
        return self.flows_counters('packet_count',
178 1
                                   dpid,
179
                                   counter='packet_counter',
180 1
                                   rate='packet_per_second')
181 1
182 1
    @rest('v1/bytes_count/per_flow/{dpid}')
183 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
184 1
        """Per flow bytes count."""
185
        dpid = request.path_params["dpid"]
186
        return self.flows_counters('byte_count',
187 1
                                   dpid,
188 1
                                   counter='bytes_counter',
189 1
                                   rate='bits_per_second')
190 1
191 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
192 1
                       total=False) -> JSONResponse:
193 1
        """Calculate flows statistics.
194
        The returned statistics are both per flow and for the sum of flows
195
        """
196 1
197
        if total:
198 1
            count_flows = 0
199 1
        else:
200
            count_flows = []
201
            if not counter:
202
                counter = field
203 1
            if not rate:
204
                rate = field
205 1
206 1
        # We don't have statistics persistence yet, so for now this only works
207 1
        # for start and end equals to zero
208
        flows = self.flow_stats_by_dpid_flow_id([dpid])
209 1
        flows = flows.get(dpid)
210
211 1
        if flows is None:
212
            return JSONResponse(count_flows)
213 1
        for flow_id, stats in flows.items():
214 1
            count = stats[field]
215
            if total:
216
                count_flows += count
217
            else:
218 1
                try:
219
                    per_second = count / stats['duration_sec']
220
                except ZeroDivisionError:
221
                    per_second = 0
222
                if rate.startswith('bits'):
223 1
                    per_second *= 8
224
                count_flows.append({'flow_id': flow_id,
225 1
                                    counter: count,
226 1
                                    rate: per_second})
227 1
        return JSONResponse(count_flows)
228 1
229 1
    @listen_to('kytos/of_core.flow_stats.received')
230
    def on_stats_received(self, event):
231
        """Capture flow stats messages for OpenFlow 1.3."""
232
        self.handle_stats_received(event)
233
234
    def handle_stats_received(self, event):
235
        """Handle flow stats messages for OpenFlow 1.3."""
236
        if 'replies_flows' in event.content:
237
            replies_flows = event.content['replies_flows']
238
            self.handle_stats_reply_received(replies_flows)
239
240
    def handle_stats_reply_received(self, replies_flows):
241
        """Update the set of flows stats"""
242
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
243
244
    @listen_to('kytos/of_core.table_stats.received')
245
    def on_table_stats_received(self, event):
246
        """Capture table stats messages for OpenFlow 1.3."""
247
        self.handle_table_stats_received(event)
248
249
    def handle_table_stats_received(self, event):
250
        """Handle table stats messages for OpenFlow 1.3."""
251
        replies_tables = event.content['replies_tables']
252
        self.handle_table_stats_reply_received(replies_tables)
253
254
    def handle_table_stats_reply_received(self, replies_tables):
255
        """Update the set of tables stats"""
256
        for table in replies_tables:
257
            switch_id = table.switch.id
258
            if switch_id not in self.tables_stats_dict:
259
                self.tables_stats_dict[switch_id] = {}
260
            self.tables_stats_dict[switch_id][table.table_id] = table
261
262
    @alisten_to('kytos/of_core.port_stats')
263
    async def on_port_stats(self, event: KytosEvent) -> None:
264
        """Handle port stats messages for OpenFlow 1.3."""
265
        port_stats = event.content.get('port_stats')
266
        switch = event.content.get('switch')
267
        if not port_stats or not switch:
268
            return
269
        for port in port_stats:
270
            self.port_stats_dict[switch.id][port.port_no.value] = {
271
                "port_no": port.port_no.value,
272
                "rx_packets": port.rx_packets.value,
273
                "tx_packets": port.tx_packets.value,
274
                "rx_bytes": port.rx_bytes.value,
275
                "tx_bytes": port.tx_bytes.value,
276
                "rx_dropped": port.rx_dropped.value,
277
                "tx_dropped": port.tx_dropped.value,
278
                "rx_errors": port.rx_errors.value,
279
                "tx_errors": port.tx_errors.value,
280
                "rx_frame_err": port.rx_frame_err.value,
281
                "rx_over_err": port.rx_over_err.value,
282
                "rx_crc_err": port.rx_crc_err.value,
283
                "collisions": port.collisions.value,
284
                "duration_sec": port.duration_sec.value,
285
                "duration_nsec": port.duration_nsec.value,
286
            }
287