build.main   A
last analyzed

Complexity

Total Complexity 28

Size/Duplication

Total Lines 169
Duplicated Lines 0 %

Test Coverage

Coverage 83.72%

Importance

Changes 0
Metric Value
wmc 28
eloc 105
dl 0
loc 169
rs 10
c 0
b 0
f 0
ccs 72
cts 86
cp 0.8372

14 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.shutdown() 0 2 1
A Main.setup() 0 8 1
A Main.execute() 0 2 1
A Main.flow_from_id() 0 3 1
A Main.flow_stats_by_dpid_flow_id() 0 16 4
A Main.bytes_count() 0 14 2
A Main.handle_stats_received() 0 5 2
B Main.flows_counters() 0 34 8
A Main.bytes_count_per_flow() 0 8 1
A Main.packet_count() 0 14 2
A Main.handle_stats_reply_received() 0 3 1
A Main.on_stats_received() 0 4 1
A Main.flow_stats() 0 10 2
A Main.packet_count_per_flow() 0 8 1
1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
from kytos.core import KytosNApp, log, rest
9 1
from kytos.core.helpers import listen_to
10 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request
11
12
13
# pylint: disable=too-many-public-methods
14 1
class Main(KytosNApp):
15
    """Main class of amlight/flow_stats NApp.
16
    This class is the entry point for this napp.
17
    """
18
19 1
    def setup(self):
20
        """Replace the '__init__' method for the KytosNApp subclass.
21
        The setup method is automatically called by the controller when your
22
        application is loaded.
23
        So, if you have any setup routine, insert it here.
24
        """
25 1
        log.info('Starting Kytos/Amlight flow manager')
26 1
        self.flows_stats_dict = {}
27
28 1
    def execute(self):
29
        """This method is executed right after the setup method execution.
30
        You can also use this method in loop mode if you add to the above setup
31
        method a line like the following example:
32
            self.execute_as_loop(30)  # 30-second interval.
33
        """
34
35 1
    def shutdown(self):
36
        """This method is executed when your napp is unloaded.
37
        If you have some cleanup procedure, insert it here.
38
        """
39
40 1
    def flow_from_id(self, flow_id):
41
        """Flow from given flow_id."""
42 1
        return self.flows_stats_dict.get(flow_id)
43
44 1
    def flow_stats_by_dpid_flow_id(self, dpids):
45
        """ Auxiliar funcion for v1/flow/stats endpoint implementation.
46
        """
47 1
        flow_stats_by_id = {}
48 1
        flows_stats_dict_copy = self.flows_stats_dict.copy()
49 1
        for flow_id, flow in flows_stats_dict_copy.items():
50
            dpid = flow.switch.dpid
51
            if dpid in dpids:
52
                if dpid not in flow_stats_by_id:
53
                    flow_stats_by_id[dpid] = {}
54
                info_flow_as_dict = flow.stats.as_dict()
55
                info_flow_as_dict.update({"cookie": flow.cookie})
56
                info_flow_as_dict.update({"priority": flow.priority})
57
                info_flow_as_dict.update({"match": flow.match.as_dict()})
58
                flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict})
59 1
        return flow_stats_by_id
60
61 1
    @rest('v1/flow/stats')
62 1
    def flow_stats(self, request: Request) -> JSONResponse:
63
        """Return the flows stats by dpid.
64
        Return the stats of all flows if dpid is None
65
        """
66 1
        dpids = request.query_params.getlist("dpid")
67 1
        if len(dpids) == 0:
68 1
            dpids = [sw.dpid for sw in self.controller.switches.values()]
69 1
        flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids)
70 1
        return JSONResponse(flow_stats_by_id)
71
72 1
    @rest('v1/packet_count/{flow_id}')
73 1
    def packet_count(self, request: Request) -> JSONResponse:
74
        """Packet count of an specific flow."""
75 1
        flow_id = request.path_params["flow_id"]
76 1
        flow = self.flow_from_id(flow_id)
77 1
        if flow is None:
78 1
            raise HTTPException(404, detail="Flow does not exist")
79 1
        packet_stats = {
80
            'flow_id': flow_id,
81
            'packet_counter': flow.stats.packet_count,
82
            'packet_per_second':
83
                flow.stats.packet_count / flow.stats.duration_sec
84
            }
85 1
        return JSONResponse(packet_stats)
86
87 1
    @rest('v1/bytes_count/{flow_id}')
88 1
    def bytes_count(self, request: Request) -> JSONResponse:
89
        """Bytes count of an specific flow."""
90 1
        flow_id = request.path_params["flow_id"]
91 1
        flow = self.flow_from_id(flow_id)
92 1
        if flow is None:
93 1
            raise HTTPException(404, detail="Flow does not exist")
94 1
        bytes_stats = {
95
            'flow_id': flow_id,
96
            'bytes_counter': flow.stats.byte_count,
97
            'bits_per_second':
98
                flow.stats.byte_count * 8 / flow.stats.duration_sec
99
            }
100 1
        return JSONResponse(bytes_stats)
101
102 1
    @rest('v1/packet_count/per_flow/{dpid}')
103 1
    def packet_count_per_flow(self, request: Request) -> JSONResponse:
104
        """Per flow packet count."""
105 1
        dpid = request.path_params["dpid"]
106 1
        return self.flows_counters('packet_count',
107
                                   dpid,
108
                                   counter='packet_counter',
109
                                   rate='packet_per_second')
110
111 1
    @rest('v1/bytes_count/per_flow/{dpid}')
112 1
    def bytes_count_per_flow(self, request: Request) -> JSONResponse:
113
        """Per flow bytes count."""
114 1
        dpid = request.path_params["dpid"]
115 1
        return self.flows_counters('byte_count',
116
                                   dpid,
117
                                   counter='bytes_counter',
118
                                   rate='bits_per_second')
119
120 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
121
                       total=False) -> JSONResponse:
122
        """Calculate flows statistics.
123
        The returned statistics are both per flow and for the sum of flows
124
        """
125
126 1
        if total:
127
            count_flows = 0
128
        else:
129 1
            count_flows = []
130 1
            if not counter:
131
                counter = field
132 1
            if not rate:
133
                rate = field
134
135
        # We don't have statistics persistence yet, so for now this only works
136
        # for start and end equals to zero
137 1
        flows = self.flow_stats_by_dpid_flow_id([dpid])
138 1
        flows = flows.get(dpid)
139
140 1
        if flows is None:
141 1
            return JSONResponse(count_flows)
142 1
        for flow_id, stats in flows.items():
143 1
            count = stats[field]
144 1
            if total:
145
                count_flows += count
146
            else:
147 1
                per_second = count / stats['duration_sec']
148 1
                if rate.startswith('bits'):
149 1
                    per_second *= 8
150 1
                count_flows.append({'flow_id': flow_id,
151
                                    counter: count,
152
                                    rate: per_second})
153 1
        return JSONResponse(count_flows)
154
155 1
    @listen_to('kytos/of_core.flow_stats.received')
156 1
    def on_stats_received(self, event):
157
        """Capture flow stats messages for OpenFlow 1.3."""
158
        self.handle_stats_received(event)
159
160 1
    def handle_stats_received(self, event):
161
        """Handle flow stats messages for OpenFlow 1.3."""
162 1
        if 'replies_flows' in event.content:
163 1
            replies_flows = event.content['replies_flows']
164 1
            self.handle_stats_reply_received(replies_flows)
165
166 1
    def handle_stats_reply_received(self, replies_flows):
167
        """Update the set of flows stats"""
168
        self.flows_stats_dict.update({flow.id: flow for flow in replies_flows})
169