Test Failed
Pull Request — master (#25)
by Italo Valcy
08:00 queued 03:33
created

build.main   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 230
Duplicated Lines 0 %

Test Coverage

Coverage 73.88%

Importance

Changes 0
Metric Value
wmc 44
eloc 153
dl 0
loc 230
ccs 99
cts 134
cp 0.7388
rs 8.8798
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of amlight/kytos_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from kytos.core import KytosNApp, log, rest
9 1
from kytos.core.helpers import listen_to
10 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request
11
12
13
# pylint: disable=too-many-public-methods
14 1
class Main(KytosNApp):
15
    """Main class of amlight/kytos_stats NApp.
16
    This class is the entry point for this napp.
17
    """
18
19 1
    def setup(self):
20
        """Replace the '__init__' method for the KytosNApp subclass.
21
        The setup method is automatically called by the controller when your
22
        application is loaded.
23
        So, if you have any setup routine, insert it here.
24
        """
25 1
        log.info('Starting Kytos/Amlight flow manager')
26 1
        self.flows_stats_dict = {}
27 1
        self.tables_stats_dict = {}
28
        self.port_stats_dict = {}
29 1
30
    def execute(self):
31
        """This method is executed right after the setup method execution.
32
        You can also use this method in loop mode if you add to the above setup
33
        method a line like the following example:
34
            self.execute_as_loop(30)  # 30-second interval.
35
        """
36 1
37
    def shutdown(self):
38
        """This method is executed when your napp is unloaded.
39
        If you have some cleanup procedure, insert it here.
40
        """
41 1
42
    def flow_from_id(self, flow_id):
43 1
        """Flow from given flow_id."""
44
        return self.flows_stats_dict.get(flow_id)
45 1
46
    def flow_stats_by_dpid_flow_id(self, dpids):
47
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
48 1
        """
49 1
        flow_stats_by_id = {}
50 1
        flows_stats_dict_copy = self.flows_stats_dict.copy()
51
        for flow_id, flow in flows_stats_dict_copy.items():
52
            dpid = flow.switch.dpid
53
            if dpid in dpids:
54
                if dpid not in flow_stats_by_id:
55
                    flow_stats_by_id[dpid] = {}
56
                info_flow_as_dict = flow.stats.as_dict()
57
                info_flow_as_dict.update({"cookie": flow.cookie})
58
                info_flow_as_dict.update({"priority": flow.priority})
59
                info_flow_as_dict.update({"match": flow.match.as_dict()})
60 1
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
61
        return flow_stats_by_id
62 1
63
    def table_stats_by_dpid_table_id(self, dpids, table_ids):
64
        """ Auxiliar funcion for v1/table/stats endpoint implementation.
65
        """
66
        table_stats_by_id = {}
67
        tables_stats_dict_copy = self.tables_stats_dict.copy()
68
        for dpid, tables in tables_stats_dict_copy.items():
69
            if dpid not in dpids:
70
                continue
71
            table_stats_by_id[dpid] = {}
72
            if len(table_ids) == 0:
73
                table_ids = list(tables.keys())
74
            for table_id, table in tables.items():
75
                if table_id in table_ids:
76
                    table_dict = table.as_dict()
77
                    del table_dict['switch']
78
                    table_stats_by_id[dpid][table_id] = table_dict
79
        return table_stats_by_id
80 1
81 1
    def port_stats_filter(self, f_dpids: list[str], f_ports: list[int]) -> dict:
82
        """ Auxiliar funcion for v1/port/stats endpoint implementation.
83
        """
84
        port_stats = {}
85 1
        dpid_keys = f_dpids if f_dpids else self.port_stats_dict.keys()
86 1
        for dpid in dpid_keys:
87 1
            port_stats[dpid] = {}
88 1
            port_keys = f_ports
89 1
            if not f_ports:
90
                port_keys = self.port_stats_dict.get(dpid, {}).keys()
91 1
            for port_no in port_keys:
92 1
                if p_stat := self.port_stats_dict[dpid].get(port_no):
93
                    port_stats[dpid][port_no] = p_stat
94
        print(port_stats)
95
        return port_stats
96 1
97 1
    @rest('v1/flow/stats')
98 1
    def flow_stats(self, request: Request) -> JSONResponse:
99 1
        """Return the flows stats by dpid.
100 1
        Return the stats of all flows if dpid is None
101 1
        """
102 1
        dpids = request.query_params.getlist("dpid")
103
        if len(dpids) == 0:
104 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
105 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
106
        return JSONResponse(flow_stats_by_id)
107 1
108 1
    @rest('v1/table/stats')
109 1
    def table_stats(self, request: Request) -> JSONResponse:
110 1
        """Return the table stats by dpid,
111 1
        and optionally by table_id.
112 1
        """
113
        dpids = request.query_params.getlist("dpid")
114
        if len(dpids) == 0:
115
            dpids = [sw.dpid for sw in self.controller.switches.values()]
116 1
        table_ids = request.query_params.getlist("table")
117
        table_ids = list(map(int, table_ids))
118
        table_stats_dpid = self.table_stats_by_dpid_table_id(dpids, table_ids)
119
        return JSONResponse(table_stats_dpid)
120
121 1
    @rest('v1/port/stats')
122
    async def port_stats(self, request: Request) -> JSONResponse:
123 1
        """Return the port stats by dpid, and optionally by port."""
124 1
        dpids = request.query_params.getlist("dpid")
125
        ports = list(map(int, request.query_params.getlist("port")))
126 1
        return JSONResponse(self.port_stats_filter(dpids, ports))
127 1
128 1
    @rest('v1/packet_count/{flow_id}')
129 1
    def packet_count(self, request: Request) -> JSONResponse:
130 1
        """Packet count of an specific flow."""
131 1
        flow_id = request.path_params["flow_id"]
132
        flow = self.flow_from_id(flow_id)
133
        if flow is None:
134
            raise HTTPException(404, detail="Flow does not exist")
135 1
        try:
136
            packet_per_second = \
137
                flow.stats.packet_count / flow.stats.duration_sec
138
        except ZeroDivisionError:
139
            packet_per_second = 0
140 1
        packet_stats = {
141
            'flow_id': flow_id,
142 1
            'packet_counter': flow.stats.packet_count,
143 1
            'packet_per_second': packet_per_second
144
            }
145 1
        return JSONResponse(packet_stats)
146 1
147
    @rest('v1/bytes_count/{flow_id}')
148
    def bytes_count(self, request: Request) -> JSONResponse:
149
        """Bytes count of an specific flow."""
150
        flow_id = request.path_params["flow_id"]
151 1
        flow = self.flow_from_id(flow_id)
152 1
        if flow is None:
153
            raise HTTPException(404, detail="Flow does not exist")
154 1
        try:
155 1
            bits_per_second = \
156
                flow.stats.byte_count * 8 / flow.stats.duration_sec
157
        except ZeroDivisionError:
158
            bits_per_second = 0
159
        bytes_stats = {
160 1
            'flow_id': flow_id,
161
            'bytes_counter': flow.stats.byte_count,
162
            'bits_per_second': bits_per_second
163
            }
164
        return JSONResponse(bytes_stats)
165
166 1
    @rest('v1/packet_count/per_flow/{dpid}')
167
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
168
        """Per flow packet count."""
169 1
        dpid = request.path_params["dpid"]
170 1
        return self.flows_counters('packet_count',
171
                                   dpid,
172 1
                                   counter='packet_counter',
173
                                   rate='packet_per_second')
174
175
    @rest('v1/bytes_count/per_flow/{dpid}')
176
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
177 1
        """Per flow bytes count."""
178 1
        dpid = request.path_params["dpid"]
179
        return self.flows_counters('byte_count',
180 1
                                   dpid,
181 1
                                   counter='bytes_counter',
182 1
                                   rate='bits_per_second')
183 1
184 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
185
                       total=False) -> JSONResponse:
186
        """Calculate flows statistics.
187 1
        The returned statistics are both per flow and for the sum of flows
188 1
        """
189 1
190 1
        if total:
191 1
            count_flows = 0
192 1
        else:
193 1
            count_flows = []
194
            if not counter:
195
                counter = field
196 1
            if not rate:
197
                rate = field
198 1
199 1
        # We don't have statistics persistence yet, so for now this only works
200
        # for start and end equals to zero
201
        flows = self.flow_stats_by_dpid_flow_id([dpid])
202
        flows = flows.get(dpid)
203 1
204
        if flows is None:
205 1
            return JSONResponse(count_flows)
206 1
        for flow_id, stats in flows.items():
207 1
            count = stats[field]
208
            if total:
209 1
                count_flows += count
210
            else:
211 1
                try:
212
                    per_second = count / stats['duration_sec']
213 1
                except ZeroDivisionError:
214 1
                    per_second = 0
215
                if rate.startswith('bits'):
216
                    per_second *= 8
217
                count_flows.append({'flow_id': flow_id,
218 1
                                    counter: count,
219
                                    rate: per_second})
220
        return JSONResponse(count_flows)
221
222
    @listen_to('kytos/of_core.flow_stats.received')
223 1
    def on_stats_received(self, event):
224
        """Capture flow stats messages for OpenFlow 1.3."""
225 1
        self.handle_stats_received(event)
226 1
227 1
    def handle_stats_received(self, event):
228 1
        """Handle flow stats messages for OpenFlow 1.3."""
229 1
        if 'replies_flows' in event.content:
230
            replies_flows = event.content['replies_flows']
231
            self.handle_stats_reply_received(replies_flows)
232
233
    def handle_stats_reply_received(self, replies_flows):
234
        """Update the set of flows stats"""
235
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
236
237
    @listen_to('kytos/of_core.table_stats.received')
238
    def on_table_stats_received(self, event):
239
        """Capture table stats messages for OpenFlow 1.3."""
240
        self.handle_table_stats_received(event)
241
242
    def handle_table_stats_received(self, event):
243
        """Handle table stats messages for OpenFlow 1.3."""
244
        replies_tables = event.content['replies_tables']
245
        self.handle_table_stats_reply_received(replies_tables)
246
247
    def handle_table_stats_reply_received(self, replies_tables):
248
        """Update the set of tables stats"""
249
        for table in replies_tables:
250
            switch_id = table.switch.id
251
            if switch_id not in self.tables_stats_dict:
252
                self.tables_stats_dict[switch_id] = {}
253
            self.tables_stats_dict[switch_id][table.table_id] = table
254
255
    @listen_to('kytos/of_core.port_stats')
256
    def on_port_stats(self, event):
257
        """Capture port stats messages for OpenFlow 1.3."""
258
        self.handle_port_stats(event)
259
260
    def handle_port_stats(self, event):
261
        """Handle port stats messages for OpenFlow 1.3."""
262
        port_stats = event.content.get('port_stats')
263
        switch = event.content.get('switch')
264
        if not port_stats or not switch:
265
            return
266
        for port in port_stats:
267
            self.port_stats_dict.setdefault(switch.id, {})
268
            self.port_stats_dict[switch.id][port.port_no.value] = {
269
                "port_no": port.port_no.value,
270
                "rx_packets": port.rx_packets.value,
271
                "tx_packets": port.tx_packets.value,
272
                "rx_bytes": port.rx_bytes.value,
273
                "tx_bytes": port.tx_bytes.value,
274
                "rx_dropped": port.rx_dropped.value,
275
                "tx_dropped": port.tx_dropped.value,
276
                "rx_errors": port.rx_errors.value,
277
                "tx_errors": port.tx_errors.value,
278
                "rx_frame_err": port.rx_frame_err.value,
279
                "rx_over_err": port.rx_over_err.value,
280
                "rx_crc_err": port.rx_crc_err.value,
281
                "collisions": port.collisions.value,
282
                "duration_sec": port.duration_sec.value,
283
                "duration_nsec": port.duration_nsec.value,
284
            }
285