Passed
Pull Request — master (#25)
by Vinicius
05:47 queued 02:05
created

build.main.Main.on_stats_received()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 2
cts 3
cp 0.6667
cc 1
nop 2
crap 1.037
1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from collections import defaultdict
9
10 1
from kytos.core import KytosNApp, log, rest
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.core.helpers import alisten_to, listen_to
13 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request
14
15
16
# pylint: disable=too-many-public-methods
17 1
class Main(KytosNApp):
18
    """Main class of amlight/kytos_stats NApp.
19
    This class is the entry point for this napp.
20
    """
21
22 1
    def setup(self):
23
        """Replace the '__init__' method for the KytosNApp subclass.
24
        The setup method is automatically called by the controller when your
25
        application is loaded.
26
        So, if you have any setup routine, insert it here.
27
        """
28 1
        log.info('Starting Kytos/Amlight flow manager')
29 1
        self.flows_stats_dict = {}
30 1
        self.tables_stats_dict = {}
31
        # port stats data by dpid by port_no
32 1
        self.port_stats_dict: dict[str, dict[int, dict]] = defaultdict(dict)
33
34 1
    def execute(self):
35
        """This method is executed right after the setup method execution.
36
        You can also use this method in loop mode if you add to the above setup
37
        method a line like the following example:
38
            self.execute_as_loop(30)  # 30-second interval.
39
        """
40
41 1
    def shutdown(self):
42
        """This method is executed when your napp is unloaded.
43
        If you have some cleanup procedure, insert it here.
44
        """
45
46 1
    def flow_from_id(self, flow_id):
47
        """Flow from given flow_id."""
48 1
        return self.flows_stats_dict.get(flow_id)
49
50 1
    def flow_stats_by_dpid_flow_id(self, dpids):
51
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
52
        """
53 1
        flow_stats_by_id = {}
54 1
        flows_stats_dict_copy = self.flows_stats_dict.copy()
55 1
        for flow_id, flow in flows_stats_dict_copy.items():
56
            dpid = flow.switch.dpid
57
            if dpid in dpids:
58
                if dpid not in flow_stats_by_id:
59
                    flow_stats_by_id[dpid] = {}
60
                info_flow_as_dict = flow.stats.as_dict()
61
                info_flow_as_dict.update({"cookie": flow.cookie})
62
                info_flow_as_dict.update({"priority": flow.priority})
63
                info_flow_as_dict.update({"match": flow.match.as_dict()})
64
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
65 1
        return flow_stats_by_id
66
67 1
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
68
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
69
        """
70
        table_stats_by_id = {}
71
        tables_stats_dict_copy = self.tables_stats_dict.copy()
72
        for dpid, tables in tables_stats_dict_copy.items():
73
            if dpid not in dpids:
74
                continue
75
            table_stats_by_id[dpid] = {}
76
            if len(table_ids) == 0:
77
                table_ids = list(tables.keys())
78
            for table_id, table in tables.items():
79
                if table_id in table_ids:
80
                    table_dict = table.as_dict()
81
                    del table_dict['switch']
82
                    table_stats_by_id[dpid][table_id] = table_dict
83
        return table_stats_by_id
84
85 1
    def port_stats_filter(
86
        self, f_dpids: list[str], f_ports: list[int]
87
    ) -> dict:
88
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
89
        """
90 1
        port_stats = {}
91 1
        dpid_keys = (
92
            (dpid for dpid in f_dpids if dpid in self.port_stats_dict)
93
            if f_dpids
94
            else self.port_stats_dict.keys()
95
        )
96 1
        for dpid in dpid_keys:
97 1
            port_stats[dpid] = {}
98 1
            port_keys = f_ports
99 1
            if not f_ports:
100 1
                port_keys = self.port_stats_dict[dpid].keys()
101 1
            for port_no in port_keys:
102 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
103 1
                    port_stats[dpid][port_no] = p_stat
104 1
        return port_stats
105
106 1
    @rest('v1/flow/stats')
107 1
    def flow_stats(self, request: Request) -> JSONResponse:
108
        """Return the flows stats by dpid.
109
        Return the stats of all flows if dpid is None
110
        """
111 1
        dpids = request.query_params.getlist("dpid")
112 1
        if len(dpids) == 0:
113 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
114 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
115 1
        return JSONResponse(flow_stats_by_id)
116
117 1
    @rest('v1/table/stats')
118 1
    def table_stats(self, request: Request) -> JSONResponse:
119
        """Return the table stats by dpid,
120
        and optionally by table_id.
121
        """
122 1
        dpids = request.query_params.getlist("dpid")
123 1
        if len(dpids) == 0:
124 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
125 1
        table_ids = request.query_params.getlist("table")
126 1
        table_ids = list(map(int, table_ids))
127 1
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
128 1
        return JSONResponse(table_stats_dpid)
129
130 1
    @rest('v1/port/stats')
131 1
    async def port_stats(self, request: Request) -> JSONResponse:
132
        """Return the port stats by dpid, and optionally by port."""
133 1
        dpids = request.query_params.getlist("dpid")
134 1
        ports = list(map(int, request.query_params.getlist("port")))
135 1
        return JSONResponse(self.port_stats_filter(dpids, ports))
136
137 1
    @rest('v1/packet_count/{flow_id}')
138 1
    def packet_count(self, request: Request) -> JSONResponse:
139
        """Packet count of an specific flow."""
140 1
        flow_id = request.path_params["flow_id"]
141 1
        flow = self.flow_from_id(flow_id)
142 1
        if flow is None:
143 1
            raise HTTPException(404, detail="Flow does not exist")
144 1
        try:
145 1
            packet_per_second = \
146
                flow.stats.packet_count / flow.stats.duration_sec
147
        except ZeroDivisionError:
148
            packet_per_second = 0
149 1
        packet_stats = {
150
            'flow_id': flow_id,
151
            'packet_counter': flow.stats.packet_count,
152
            'packet_per_second': packet_per_second
153
            }
154 1
        return JSONResponse(packet_stats)
155
156 1
    @rest('v1/bytes_count/{flow_id}')
157 1
    def bytes_count(self, request: Request) -> JSONResponse:
158
        """Bytes count of an specific flow."""
159 1
        flow_id = request.path_params["flow_id"]
160 1
        flow = self.flow_from_id(flow_id)
161 1
        if flow is None:
162 1
            raise HTTPException(404, detail="Flow does not exist")
163 1
        try:
164 1
            bits_per_second = \
165
                flow.stats.byte_count * 8 / flow.stats.duration_sec
166
        except ZeroDivisionError:
167
            bits_per_second = 0
168 1
        bytes_stats = {
169
            'flow_id': flow_id,
170
            'bytes_counter': flow.stats.byte_count,
171
            'bits_per_second': bits_per_second
172
            }
173 1
        return JSONResponse(bytes_stats)
174
175 1
    @rest('v1/packet_count/per_flow/{dpid}')
176 1
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
177
        """Per flow packet count."""
178 1
        dpid = request.path_params["dpid"]
179 1
        return self.flows_counters('packet_count',
180
                                   dpid,
181
                                   counter='packet_counter',
182
                                   rate='packet_per_second')
183
184 1
    @rest('v1/bytes_count/per_flow/{dpid}')
185 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
186
        """Per flow bytes count."""
187 1
        dpid = request.path_params["dpid"]
188 1
        return self.flows_counters('byte_count',
189
                                   dpid,
190
                                   counter='bytes_counter',
191
                                   rate='bits_per_second')
192
193 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
194
                       total=False) -> JSONResponse:
195
        """Calculate flows statistics.
196
        The returned statistics are both per flow and for the sum of flows
197
        """
198
199 1
        if total:
200
            count_flows = 0
201
        else:
202 1
            count_flows = []
203 1
            if not counter:
204
                counter = field
205 1
            if not rate:
206
                rate = field
207
208
        # We don't have statistics persistence yet, so for now this only works
209
        # for start and end equals to zero
210 1
        flows = self.flow_stats_by_dpid_flow_id([dpid])
211 1
        flows = flows.get(dpid)
212
213 1
        if flows is None:
214 1
            return JSONResponse(count_flows)
215 1
        for flow_id, stats in flows.items():
216 1
            count = stats[field]
217 1
            if total:
218
                count_flows += count
219
            else:
220 1
                try:
221 1
                    per_second = count / stats['duration_sec']
222 1
                except ZeroDivisionError:
223 1
                    per_second = 0
224 1
                if rate.startswith('bits'):
225 1
                    per_second *= 8
226 1
                count_flows.append({'flow_id': flow_id,
227
                                    counter: count,
228
                                    rate: per_second})
229 1
        return JSONResponse(count_flows)
230
231 1
    @listen_to('kytos/of_core.flow_stats.received')
232 1
    def on_stats_received(self, event):
233
        """Capture flow stats messages for OpenFlow 1.3."""
234
        self.handle_stats_received(event)
235
236 1
    def handle_stats_received(self, event):
237
        """Handle flow stats messages for OpenFlow 1.3."""
238 1
        if 'replies_flows' in event.content:
239 1
            replies_flows = event.content['replies_flows']
240 1
            self.handle_stats_reply_received(replies_flows)
241
242 1
    def handle_stats_reply_received(self, replies_flows):
243
        """Update the set of flows stats"""
244 1
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
245
246 1
    @listen_to('kytos/of_core.table_stats.received')
247 1
    def on_table_stats_received(self, event):
248
        """Capture table stats messages for OpenFlow 1.3."""
249
        self.handle_table_stats_received(event)
250
251 1
    def handle_table_stats_received(self, event):
252
        """Handle table stats messages for OpenFlow 1.3."""
253
        replies_tables = event.content['replies_tables']
254
        self.handle_table_stats_reply_received(replies_tables)
255
256 1
    def handle_table_stats_reply_received(self, replies_tables):
257
        """Update the set of tables stats"""
258 1
        for table in replies_tables:
259 1
            switch_id = table.switch.id
260 1
            if switch_id not in self.tables_stats_dict:
261 1
                self.tables_stats_dict[switch_id] = {}
262 1
            self.tables_stats_dict[switch_id][table.table_id] = table
263
264 1
    @alisten_to('kytos/of_core.port_stats')
265 1
    async def on_port_stats(self, event: KytosEvent) -> None:
266
        """Handle port stats messages for OpenFlow 1.3."""
267 1
        port_stats = event.content.get('port_stats')
268 1
        switch = event.content.get('switch')
269 1
        if not port_stats or not switch:
270 1
            return
271 1
        for port in port_stats:
272 1
            self.port_stats_dict[switch.id][port.port_no.value] = {
273
                "port_no": port.port_no.value,
274
                "rx_packets": port.rx_packets.value,
275
                "tx_packets": port.tx_packets.value,
276
                "rx_bytes": port.rx_bytes.value,
277
                "tx_bytes": port.tx_bytes.value,
278
                "rx_dropped": port.rx_dropped.value,
279
                "tx_dropped": port.tx_dropped.value,
280
                "rx_errors": port.rx_errors.value,
281
                "tx_errors": port.tx_errors.value,
282
                "rx_frame_err": port.rx_frame_err.value,
283
                "rx_over_err": port.rx_over_err.value,
284
                "rx_crc_err": port.rx_crc_err.value,
285
                "collisions": port.collisions.value,
286
                "duration_sec": port.duration_sec.value,
287
                "duration_nsec": port.duration_nsec.value,
288
            }
289