Passed
Push — master ( b0712d...412579 )
by Vinicius
03:16 queued 12s
created

build.main.Main.match_and_apply()   C

Complexity

Conditions 11

Size

Total Lines 30
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 104.2109

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 30
rs 5.4
c 0
b 0
f 0
ccs 2
cts 24
cp 0.0833
cc 11
nop 2
crap 104.2109

How to fix   Complexity   

Complexity

Complex classes like build.main.Main.match_and_apply() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
import hashlib
9 1
import ipaddress
10 1
import json
11 1
from threading import Lock
12
13 1
from flask import jsonify, request
14 1
from kytos.core import KytosEvent, KytosNApp, log, rest
15 1
from kytos.core.helpers import listen_to
16 1
from napps.amlight.flow_stats.utils import format_request
17 1
from napps.kytos.of_core.v0x04.flow import Action as Action13
18 1
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
19
20
21 1
class GenericFlow():
22
    """Class to represent a flow.
23
24
        This class represents a flow regardless of the OF version."""
25
26 1
    def __init__(self, version='0x04', match=None, idle_timeout=0,
27
                 hard_timeout=0, duration_sec=0, packet_count=0, byte_count=0,
28
                 priority=0, table_id=0xff, cookie=None, buffer_id=None,
29
                 actions=None):
30 1
        self.version = version
31 1
        self.match = match if match else {}
32 1
        self.idle_timeout = idle_timeout
33 1
        self.hard_timeout = hard_timeout
34 1
        self.duration_sec = duration_sec
35 1
        self.packet_count = packet_count
36 1
        self.byte_count = byte_count
37 1
        self.priority = priority
38 1
        self.table_id = table_id
39 1
        self.cookie = cookie
40 1
        self.buffer_id = buffer_id
41 1
        self.actions = actions if actions else []
42
43 1
    def __eq__(self, other):
44
        return self.id == other.id
45
46 1
    @property
47 1
    def id(self):
48
        # pylint: disable=invalid-name
49
        """Return the hash of the object.
50
        Calculates the hash of the object by using the hashlib we use md5 of
51
        strings.
52
        Returns:
53
            string: Hash of object.
54
        """
55 1
        hash_result = hashlib.md5()
56 1
        hash_result.update(str(self.version).encode('utf-8'))
57 1
        for value in self.match.copy().values():
58 1
            hash_result.update(str(value.value).encode('utf-8'))
59 1
        hash_result.update(str(self.idle_timeout).encode('utf-8'))
60 1
        hash_result.update(str(self.hard_timeout).encode('utf-8'))
61 1
        hash_result.update(str(self.priority).encode('utf-8'))
62 1
        hash_result.update(str(self.table_id).encode('utf-8'))
63 1
        hash_result.update(str(self.cookie).encode('utf-8'))
64 1
        hash_result.update(str(self.buffer_id).encode('utf-8'))
65
66 1
        return hash_result.hexdigest()
67
68 1
    def to_dict(self):
69
        """Convert flow to a dictionary."""
70 1
        flow_dict = {}
71 1
        flow_dict['version'] = self.version
72 1
        flow_dict.update(self.match_to_dict())
73 1
        flow_dict['idle_timeout'] = self.idle_timeout
74 1
        flow_dict['hard_timeout'] = self.hard_timeout
75 1
        flow_dict['priority'] = self.priority
76 1
        flow_dict['table_id'] = self.table_id
77 1
        flow_dict['cookie'] = self.cookie
78 1
        flow_dict['buffer_id'] = self.buffer_id
79 1
        flow_dict['actions'] = []
80 1
        for action in self.actions:
81 1
            flow_dict['actions'].append(action.as_dict())
82
83 1
        return flow_dict
84
85 1
    def match_to_dict(self):
86
        """Convert a match in OF 1.3 to a dictionary."""
87
        # pylint: disable=consider-using-dict-items
88 1
        match = {}
89 1
        for name in self.match.copy():
90 1
            match[name] = self.match[name].value
91 1
        return match
92
93 1
    def to_json(self):
94
        """Return a json version of the flow."""
95
        return json.dumps(self.to_dict())
96
97
    # @staticmethod
98
    # def from_dict(flow_dict):
99
    #     """Create a flow from a dict."""
100
    #     flow = GenericFlow()
101
    #     for attr_name, value in flow_dict.items():
102
    #         if attr_name == 'actions':
103
    #             flow.actions = []
104
    #             for action in value:
105
    #                 new_action = ACTION_TYPES[int(action['action_type'])]()
106
    #                 for action_attr_name,
107
    #                     action_attr_value in action.items():
108
    #
109
    #                     setattr(new_action,
110
    #                             action_attr_name,
111
    #                             action_attr_value)
112
    #                 flow.actions.append(new_action)
113
    #         else:
114
    #             setattr(flow, attr_name, value)
115
    #     return flow
116
117 1
    @classmethod
118 1
    def from_flow_stats(cls, flow_stats, version='0x04'):
119
        """Create a flow from OF flow stats."""
120 1
        flow = GenericFlow(version=version)
121 1
        flow.idle_timeout = flow_stats.idle_timeout.value
122 1
        flow.hard_timeout = flow_stats.hard_timeout.value
123 1
        flow.priority = flow_stats.priority.value
124 1
        flow.table_id = flow_stats.table_id.value
125 1
        flow.duration_sec = flow_stats.duration_sec.value
126 1
        flow.packet_count = flow_stats.packet_count.value
127 1
        flow.byte_count = flow_stats.byte_count.value
128 1
        if version == '0x04':
129 1
            for match in flow_stats.match.oxm_match_fields:
130
                match_field = MatchFieldFactory.from_of_tlv(match)
131
                field_name = match_field.name
132
                if field_name == 'dl_vlan':
133
                    field_name = 'vlan_vid'
134
                flow.match[field_name] = match_field
135 1
            flow.actions = []
136 1
            for instruction in flow_stats.instructions:
137
                if instruction.instruction_type == 'apply_actions':
138
                    for of_action in instruction.actions:
139
                        action = Action13.from_of_action(of_action)
140
                        flow.actions.append(action)
141 1
        return flow
142
143 1
    @classmethod
144 1
    def from_replies_flows(cls, flow04):
145
        """Create a flow from a flow passed on
146
        replies_flows in event kytos/of_core.flow_stats.received."""
147
148 1
        flow = GenericFlow(version='0x04')
149 1
        flow.idle_timeout = flow04.idle_timeout
150 1
        flow.hard_timeout = flow04.hard_timeout
151 1
        flow.priority = flow04.priority
152 1
        flow.table_id = flow04.table_id
153 1
        flow.cookie = flow04.cookie
154 1
        flow.duration_sec = flow04.stats.duration_sec
155 1
        flow.packet_count = flow04.stats.packet_count
156 1
        flow.byte_count = flow04.stats.byte_count
157
158 1
        as_of_match = flow04.match.as_of_match()
159 1
        for match in as_of_match.oxm_match_fields:
160 1
            match_field = MatchFieldFactory.from_of_tlv(match)
161 1
            field_name = match_field.name
162 1
            if field_name == 'dl_vlan':
163
                field_name = 'vlan_vid'
164 1
            flow.match[field_name] = match_field
165 1
        flow.actions = []
166 1
        for instruction in flow04.instructions:
167 1
            if instruction.instruction_type == 'apply_actions':
168 1
                for of_action in instruction.actions:
169 1
                    flow.actions.append(of_action)
170 1
        return flow
171
172 1
    def do_match(self, args):
173
        """Match a packet against this flow."""
174
        if self.version == '0x04':
175
            return self.match13(args)
176
        return None
177
178 1
    def match13(self, args):
179
        """Match a packet against this flow (OF1.3)."""
180
        # pylint: disable=consider-using-dict-items
181
        for name in self.match.copy():
182
            if name not in args:
183
                return False
184
            if name == 'vlan_vid':
185
                field = args[name][-1]
186
            else:
187
                field = args[name]
188
            if name not in ('ipv4_src', 'ipv4_dst', 'ipv6_src', 'ipv6_dst'):
189
                if self.match[name].value != field:
190
                    return False
191
            else:
192
                packet_ip = int(ipaddress.ip_address(field))
193
                ip_addr = self.match[name].value
194
                if packet_ip & ip_addr.netmask != ip_addr.address:
195
                    return False
196
        return self
197
198
199
# pylint: disable=too-many-public-methods
200 1
class Main(KytosNApp):
201
    """Main class of amlight/flow_stats NApp.
202
203
    This class is the entry point for this napp.
204
    """
205
206 1
    def setup(self):
207
        """Replace the '__init__' method for the KytosNApp subclass.
208
209
        The setup method is automatically called by the controller when your
210
        application is loaded.
211
212
        So, if you have any setup routine, insert it here.
213
        """
214 1
        log.info('Starting Kytos/Amlight flow manager')
215 1
        for switch in self.controller.switches.copy().values():
216
            switch.generic_flows = []
217 1
        self.switch_stats_xid = {}
218 1
        self.switch_stats_lock = {}
219
220 1
    def execute(self):
221
        """This method is executed right after the setup method execution.
222
223
        You can also use this method in loop mode if you add to the above setup
224
        method a line like the following example:
225
226
            self.execute_as_loop(30)  # 30-second interval.
227
        """
228
229 1
    def shutdown(self):
230
        """This method is executed when your napp is unloaded.
231
232
        If you have some cleanup procedure, insert it here.
233
        """
234
235 1
    def flow_from_id(self, flow_id):
236
        """Flow from given flow_id."""
237 1
        for switch in self.controller.switches.copy().values():
238 1
            try:
239 1
                for flow in switch.generic_flows:
240 1
                    if flow.id == flow_id:
241 1
                        return flow
242
            except KeyError:
243
                pass
244 1
        return None
245
246 1
    @rest('flow/match/<dpid>')
247 1
    def flow_match(self, dpid):
248
        """Return first flow matching request."""
249 1
        switch = self.controller.get_switch_by_dpid(dpid)
250 1
        flow = self.match_flows(switch, format_request(request.args), False)
251 1
        if flow:
252 1
            return jsonify(flow.to_dict())
253 1
        return "No match", 404
254
255 1
    @rest('flow/stats/<dpid>')
256 1
    def flow_stats(self, dpid):
257
        """Return all flows matching request."""
258 1
        switch = self.controller.get_switch_by_dpid(dpid)
259 1
        if not switch:
260
            return f"switch {dpid} not found", 404
261 1
        flows = self.match_flows(switch, format_request(request.args), True)
262 1
        flows = [flow.to_dict() for flow in flows]
263 1
        return jsonify(flows)
264
265 1
    @staticmethod
266 1
    def match_flows(switch, args, many=True):
267
        # pylint: disable=bad-staticmethod-argument
268
        """
269
        Match the packet in request against the flows installed in the switch.
270
271
        Try the match with each flow, in other. If many is True, tries the
272
        match with all flows, if False, tries until the first match.
273
        :param args: packet data
274
        :param many: Boolean, indicating whether to continue after matching the
275
                first flow or not
276
        :return: If many, the list of matched flows, or the matched flow
277
        """
278
        response = []
279
        try:
280
            for flow in switch.generic_flows:
281
                match = flow.do_match(args)
282
                if match:
283
                    if many:
284
                        response.append(match)
285
                    else:
286
                        response = match
287
                        break
288
        except AttributeError:
289
            return None
290
        if not many and isinstance(response, list):
291
            return None
292
        return response
293
294 1
    @staticmethod
295 1
    def match_and_apply(switch, args):
296
        # pylint: disable=bad-staticmethod-argument
297
        """Match flows and apply actions.
298
299
        Match given packet (in args) against the switch flows and,
300
        if a match flow is found, apply its actions."""
301
        flow = Main.match_flows(switch, args, False)
302
        port = None
303
        actions = None
304
        # pylint: disable=too-many-nested-blocks
305
        if flow:
306
            actions = flow.actions
307
            if switch.ofp_version == '0x04':
308
                for action in actions:
309
                    action_type = action.action_type
310
                    if action_type == 'output':
311
                        port = action.port
312
                    if action_type == 'push_vlan':
313
                        if 'vlan_vid' not in args:
314
                            args['vlan_vid'] = []
315
                        args['vlan_vid'].append(0)
316
                    if action_type == 'pop_vlan':
317
                        if 'vlan_vid' in args:
318
                            args['vlan_vid'].pop()
319
                            if len(args['vlan_vid']) == 0:
320
                                del args['vlan_vid']
321
                    if action_type == 'set_vlan':
322
                        args['vlan_vid'][-1] = action.vlan_id
323
        return flow, args, port
324
325 1
    @rest('packet_count/<flow_id>')
326 1
    def packet_count(self, flow_id):
327
        """Packet count of an specific flow."""
328 1
        flow = self.flow_from_id(flow_id)
329 1
        if flow is None:
330 1
            return "Flow does not exist", 404
331 1
        packet_stats = {
332
            'flow_id': flow_id,
333
            'packet_counter': flow.packet_count,
334
            'packet_per_second': flow.packet_count / flow.duration_sec
335
            }
336 1
        return jsonify(packet_stats)
337
338 1
    @rest('bytes_count/<flow_id>')
339 1
    def bytes_count(self, flow_id):
340
        """Bytes count of an specific flow."""
341 1
        flow = self.flow_from_id(flow_id)
342 1
        if flow is None:
343 1
            return "Flow does not exist", 404
344 1
        bytes_stats = {
345
            'flow_id': flow_id,
346
            'bytes_counter': flow.byte_count,
347
            'bits_per_second': flow.byte_count * 8 / flow.duration_sec
348
            }
349 1
        return jsonify(bytes_stats)
350
351 1
    @rest('packet_count/per_flow/<dpid>')
352 1
    def packet_count_per_flow(self, dpid):
353
        """Per flow packet count."""
354 1
        return self.flows_counters('packet_count',
355
                                   dpid,
356
                                   counter='packet_counter',
357
                                   rate='packet_per_second')
358
359 1
    @rest('packet_count/sum/<dpid>')
360 1
    def packet_count_sum(self, dpid):
361
        """Sum of packet count flow stats."""
362 1
        return self.flows_counters('packet_count',
363
                                   dpid,
364
                                   total=True)
365
366 1
    @rest('bytes_count/per_flow/<dpid>')
367 1
    def bytes_count_per_flow(self, dpid):
368
        """Per flow bytes count."""
369 1
        return self.flows_counters('byte_count',
370
                                   dpid,
371
                                   counter='bytes_counter',
372
                                   rate='bits_per_second')
373
374 1
    @rest('bytes_count/sum/<dpid>')
375 1
    def bytes_count_sum(self, dpid):
376
        """Sum of bytes count flow stats."""
377 1
        return self.flows_counters('byte_count',
378
                                   dpid,
379
                                   total=True)
380
381 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
382
                       total=False):
383
        """Calculate flows statistics.
384
385
        The returned statistics are both per flow and for the sum of flows
386
        """
387
        # pylint: disable=too-many-arguments
388
        # pylint: disable=unused-variable
389 1
        start_date = request.args.get('start_date', 0)
390 1
        end_date = request.args.get('end_date', 0)
391
        # pylint: enable=unused-variable
392
393 1
        if total:
394 1
            count_flows = 0
395
        else:
396 1
            count_flows = []
397 1
            if not counter:
398
                counter = field
399 1
            if not rate:
400
                rate = field
401
402
        # We don't have statistics persistence yet, so for now this only works
403
        # for start and end equals to zero
404 1
        flows = self.controller.get_switch_by_dpid(dpid).generic_flows
405
406 1
        for flow in flows:
407 1
            count = getattr(flow, field)
408 1
            if total:
409 1
                count_flows += count
410
            else:
411 1
                per_second = count / flow.duration_sec
412 1
                if rate.startswith('bits'):
413 1
                    per_second *= 8
414 1
                count_flows.append({'flow_id': flow.id,
415
                                    counter: count,
416
                                    rate: per_second})
417
418 1
        return jsonify(count_flows)
419
420 1
    def handle_stats_reply(self, msg, switch):
421
        """Insert flows received in the switch list of flows."""
422 1
        try:
423 1
            old_flows = switch.generic_flows
424
        except AttributeError:
425
            old_flows = []
426 1
        is_new_xid = (
427
            int(msg.header.xid) != self.switch_stats_xid.get(switch.id, 0)
428
        )
429 1
        is_last_part = msg.flags.value % 2 == 0
430 1
        self.switch_stats_lock.setdefault(switch.id, Lock())
431 1
        with self.switch_stats_lock[switch.id]:
432 1
            if is_new_xid:
433 1
                switch.generic_new_flows = []
434 1
                self.switch_stats_xid[switch.id] = int(msg.header.xid)
435 1
            for flow_stats in msg.body:
436 1
                flow = GenericFlow.from_flow_stats(flow_stats,
437
                                                   switch.ofp_version)
438 1
                switch.generic_new_flows.append(flow)
439 1
            if is_last_part:
440 1
                switch.generic_flows = switch.generic_new_flows
441 1
                switch.generic_flows.sort(
442
                    key=lambda f: (f.priority, f.duration_sec),
443
                    reverse=True
444
                )
445 1
        if is_new_xid and is_last_part and switch.generic_flows != old_flows:
446
            # Generate an event informing that flows have changed
447 1
            event = KytosEvent('amlight/flow_stats.flows_updated')
448 1
            event.content['switch'] = switch.dpid
449 1
            self.controller.buffers.app.put(event)
450
451 1
    @listen_to('kytos/of_core.flow_stats.received')
452 1
    def on_stats_received(self, event):
453
        """Capture flow stats messages for OpenFlow 1.3."""
454
        self.handle_stats_received(event)
455
456 1
    def handle_stats_received(self, event):
457
        """Handle flow stats messages for OpenFlow 1.3."""
458 1
        switch = event.content['switch']
459 1
        if 'replies_flows' in event.content:
460 1
            replies_flows = event.content['replies_flows']
461 1
            self.handle_stats_reply_received(switch, replies_flows)
462
463 1
    def handle_stats_reply_received(self, switch, replies_flows):
464
        """Iterate on the replies and set the generic flows"""
465 1
        switch.generic_flows = [GenericFlow.from_replies_flows(flow)
466
                                for flow in replies_flows]
467 1
        switch.generic_flows.sort(
468
                    key=lambda f: (f.priority, f.duration_sec),
469
                    reverse=True
470
                    )
471