1 | """Main module of amlight/flow_stats Kytos Network Application. |
||
2 | |||
3 | This NApp does operations with flows not covered by Kytos itself. |
||
4 | """ |
||
5 | # pylint: disable=too-many-return-statements,too-many-instance-attributes |
||
6 | # pylint: disable=too-many-arguments,too-many-branches,too-many-statements |
||
7 | |||
8 | 1 | from kytos.core import KytosNApp, log, rest |
|
9 | 1 | from kytos.core.helpers import listen_to |
|
10 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request |
|
11 | |||
12 | |||
13 | # pylint: disable=too-many-public-methods |
||
14 | 1 | class Main(KytosNApp): |
|
15 | """Main class of amlight/flow_stats NApp. |
||
16 | This class is the entry point for this napp. |
||
17 | """ |
||
18 | |||
19 | 1 | def setup(self): |
|
20 | """Replace the '__init__' method for the KytosNApp subclass. |
||
21 | The setup method is automatically called by the controller when your |
||
22 | application is loaded. |
||
23 | So, if you have any setup routine, insert it here. |
||
24 | """ |
||
25 | 1 | log.info('Starting Kytos/Amlight flow manager') |
|
26 | 1 | self.flows_stats_dict = {} |
|
27 | |||
28 | 1 | def execute(self): |
|
29 | """This method is executed right after the setup method execution. |
||
30 | You can also use this method in loop mode if you add to the above setup |
||
31 | method a line like the following example: |
||
32 | self.execute_as_loop(30) # 30-second interval. |
||
33 | """ |
||
34 | |||
35 | 1 | def shutdown(self): |
|
36 | """This method is executed when your napp is unloaded. |
||
37 | If you have some cleanup procedure, insert it here. |
||
38 | """ |
||
39 | |||
40 | 1 | def flow_from_id(self, flow_id): |
|
41 | """Flow from given flow_id.""" |
||
42 | 1 | return self.flows_stats_dict.get(flow_id) |
|
43 | |||
44 | 1 | def flow_stats_by_dpid_flow_id(self, dpids): |
|
45 | """ Auxiliar funcion for v1/flow/stats endpoint implementation. |
||
46 | """ |
||
47 | 1 | flow_stats_by_id = {} |
|
48 | 1 | flows_stats_dict_copy = self.flows_stats_dict.copy() |
|
49 | 1 | for flow_id, flow in flows_stats_dict_copy.items(): |
|
50 | dpid = flow.switch.dpid |
||
51 | if dpid in dpids: |
||
52 | if dpid not in flow_stats_by_id: |
||
53 | flow_stats_by_id[dpid] = {} |
||
54 | info_flow_as_dict = flow.stats.as_dict() |
||
55 | info_flow_as_dict.update({"cookie": flow.cookie}) |
||
56 | info_flow_as_dict.update({"priority": flow.priority}) |
||
57 | info_flow_as_dict.update({"match": flow.match.as_dict()}) |
||
58 | flow_stats_by_id[dpid].update({flow_id: info_flow_as_dict}) |
||
59 | 1 | return flow_stats_by_id |
|
60 | |||
61 | 1 | @rest('v1/flow/stats') |
|
62 | 1 | def flow_stats(self, request: Request) -> JSONResponse: |
|
63 | """Return the flows stats by dpid. |
||
64 | Return the stats of all flows if dpid is None |
||
65 | """ |
||
66 | 1 | dpids = request.query_params.getlist("dpid") |
|
67 | 1 | if len(dpids) == 0: |
|
68 | 1 | dpids = [sw.dpid for sw in self.controller.switches.values()] |
|
69 | 1 | flow_stats_by_id = self.flow_stats_by_dpid_flow_id(dpids) |
|
70 | 1 | return JSONResponse(flow_stats_by_id) |
|
71 | |||
72 | 1 | @rest('v1/packet_count/{flow_id}') |
|
73 | 1 | def packet_count(self, request: Request) -> JSONResponse: |
|
74 | """Packet count of an specific flow.""" |
||
75 | 1 | flow_id = request.path_params["flow_id"] |
|
76 | 1 | flow = self.flow_from_id(flow_id) |
|
77 | 1 | if flow is None: |
|
78 | 1 | raise HTTPException(404, detail="Flow does not exist") |
|
79 | 1 | packet_stats = { |
|
80 | 'flow_id': flow_id, |
||
81 | 'packet_counter': flow.stats.packet_count, |
||
82 | 'packet_per_second': |
||
83 | flow.stats.packet_count / flow.stats.duration_sec |
||
84 | } |
||
85 | 1 | return JSONResponse(packet_stats) |
|
86 | |||
87 | 1 | @rest('v1/bytes_count/{flow_id}') |
|
88 | 1 | def bytes_count(self, request: Request) -> JSONResponse: |
|
89 | """Bytes count of an specific flow.""" |
||
90 | 1 | flow_id = request.path_params["flow_id"] |
|
91 | 1 | flow = self.flow_from_id(flow_id) |
|
92 | 1 | if flow is None: |
|
93 | 1 | raise HTTPException(404, detail="Flow does not exist") |
|
94 | 1 | bytes_stats = { |
|
95 | 'flow_id': flow_id, |
||
96 | 'bytes_counter': flow.stats.byte_count, |
||
97 | 'bits_per_second': |
||
98 | flow.stats.byte_count * 8 / flow.stats.duration_sec |
||
99 | } |
||
100 | 1 | return JSONResponse(bytes_stats) |
|
101 | |||
102 | 1 | @rest('v1/packet_count/per_flow/{dpid}') |
|
103 | 1 | def packet_count_per_flow(self, request: Request) -> JSONResponse: |
|
104 | """Per flow packet count.""" |
||
105 | 1 | dpid = request.path_params["dpid"] |
|
106 | 1 | return self.flows_counters('packet_count', |
|
107 | dpid, |
||
108 | counter='packet_counter', |
||
109 | rate='packet_per_second') |
||
110 | |||
111 | 1 | @rest('v1/bytes_count/per_flow/{dpid}') |
|
112 | 1 | def bytes_count_per_flow(self, request: Request) -> JSONResponse: |
|
113 | """Per flow bytes count.""" |
||
114 | 1 | dpid = request.path_params["dpid"] |
|
115 | 1 | return self.flows_counters('byte_count', |
|
116 | dpid, |
||
117 | counter='bytes_counter', |
||
118 | rate='bits_per_second') |
||
119 | |||
120 | 1 | def flows_counters(self, field, dpid, counter=None, rate=None, |
|
121 | total=False) -> JSONResponse: |
||
122 | """Calculate flows statistics. |
||
123 | The returned statistics are both per flow and for the sum of flows |
||
124 | """ |
||
125 | |||
126 | 1 | if total: |
|
127 | count_flows = 0 |
||
128 | else: |
||
129 | 1 | count_flows = [] |
|
130 | 1 | if not counter: |
|
131 | counter = field |
||
132 | 1 | if not rate: |
|
133 | rate = field |
||
134 | |||
135 | # We don't have statistics persistence yet, so for now this only works |
||
136 | # for start and end equals to zero |
||
137 | 1 | flows = self.flow_stats_by_dpid_flow_id([dpid]) |
|
138 | 1 | flows = flows.get(dpid) |
|
139 | |||
140 | 1 | if flows is None: |
|
141 | 1 | return JSONResponse(count_flows) |
|
142 | 1 | for flow_id, stats in flows.items(): |
|
143 | 1 | count = stats[field] |
|
144 | 1 | if total: |
|
145 | count_flows += count |
||
146 | else: |
||
147 | 1 | per_second = count / stats['duration_sec'] |
|
148 | 1 | if rate.startswith('bits'): |
|
149 | 1 | per_second *= 8 |
|
150 | 1 | count_flows.append({'flow_id': flow_id, |
|
151 | counter: count, |
||
152 | rate: per_second}) |
||
153 | 1 | return JSONResponse(count_flows) |
|
154 | |||
155 | 1 | @listen_to('kytos/of_core.flow_stats.received') |
|
156 | 1 | def on_stats_received(self, event): |
|
157 | """Capture flow stats messages for OpenFlow 1.3.""" |
||
158 | self.handle_stats_received(event) |
||
159 | |||
160 | 1 | def handle_stats_received(self, event): |
|
161 | """Handle flow stats messages for OpenFlow 1.3.""" |
||
162 | 1 | if 'replies_flows' in event.content: |
|
163 | 1 | replies_flows = event.content['replies_flows'] |
|
164 | 1 | self.handle_stats_reply_received(replies_flows) |
|
165 | |||
166 | 1 | def handle_stats_reply_received(self, replies_flows): |
|
167 | """Update the set of flows stats""" |
||
168 | self.flows_stats_dict.update({flow.id: flow for flow in replies_flows}) |
||
169 |