Passed
Pull Request — master (#28)
by
unknown
03:02
created

build.main.Main.handle_stats_reply_0x04()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 2
nop 2
crap 2
1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
import hashlib
9 1
import ipaddress
10 1
import json
11 1
from threading import Lock
12
13 1
import pyof.v0x01.controller2switch.common as common01
14 1
from flask import jsonify, request
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from napps.amlight.flow_stats.utils import format_request
18 1
from napps.amlight.sdntrace import constants
19 1
from napps.kytos.of_core.v0x01.flow import Action as Action10
20 1
from napps.kytos.of_core.v0x04.flow import Action as Action13
21 1
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
22 1
from pyof.v0x01.common.flow_match import FlowWildCards
23 1
from pyof.v0x04.common.flow_instructions import InstructionType
24
25
26 1
class GenericFlow():
27
    """Class to represent a flow.
28
29
        This class represents a flow regardless of the OF version."""
30
31 1
    def __init__(self, version='0x01', match=None, idle_timeout=0,
32
                 hard_timeout=0, duration_sec=0, packet_count=0, byte_count=0,
33
                 priority=0, table_id=0xff, cookie=None, buffer_id=None,
34
                 actions=None):
35 1
        self.version = version
36 1
        self.match = match if match else {}
37 1
        self.idle_timeout = idle_timeout
38 1
        self.hard_timeout = hard_timeout
39 1
        self.duration_sec = duration_sec
40 1
        self.packet_count = packet_count
41 1
        self.byte_count = byte_count
42 1
        self.priority = priority
43 1
        self.table_id = table_id
44 1
        self.cookie = cookie
45 1
        self.buffer_id = buffer_id
46 1
        self.actions = actions if actions else []
47
48 1
    def __eq__(self, other):
49
        return self.id == other.id
50
51 1
    @property
52 1
    def id(self):
53
        # pylint: disable=invalid-name
54
        """Return the hash of the object.
55
        Calculates the hash of the object by using the hashlib we use md5 of
56
        strings.
57
        Returns:
58
            string: Hash of object.
59
        """
60 1
        hash_result = hashlib.md5()
61 1
        hash_result.update(str(self.version).encode('utf-8'))
62 1
        for value in self.match.copy().values():
63 1
            if self.version == '0x01':
64 1
                hash_result.update(str(value).encode('utf-8'))
65
            else:
66 1
                hash_result.update(str(value.value).encode('utf-8'))
67 1
        hash_result.update(str(self.idle_timeout).encode('utf-8'))
68 1
        hash_result.update(str(self.hard_timeout).encode('utf-8'))
69 1
        hash_result.update(str(self.priority).encode('utf-8'))
70 1
        hash_result.update(str(self.table_id).encode('utf-8'))
71 1
        hash_result.update(str(self.cookie).encode('utf-8'))
72 1
        hash_result.update(str(self.buffer_id).encode('utf-8'))
73
74 1
        return hash_result.hexdigest()
75
76 1
    def to_dict(self):
77
        """Convert flow to a dictionary."""
78 1
        flow_dict = {}
79 1
        flow_dict['version'] = self.version
80 1
        if self.version == '0x01':
81 1
            flow_dict.update(self.match)
82
        else:
83 1
            flow_dict.update(self.match_to_dict())
84 1
        flow_dict['idle_timeout'] = self.idle_timeout
85 1
        flow_dict['hard_timeout'] = self.hard_timeout
86 1
        flow_dict['priority'] = self.priority
87 1
        flow_dict['table_id'] = self.table_id
88 1
        flow_dict['cookie'] = self.cookie
89 1
        flow_dict['buffer_id'] = self.buffer_id
90 1
        flow_dict['actions'] = []
91 1
        for action in self.actions:
92 1
            flow_dict['actions'].append(action.as_dict())
93
94 1
        return flow_dict
95
96 1
    def match_to_dict(self):
97
        """Convert a match in OF 1.3 to a dictionary."""
98
        # pylint: disable=consider-using-dict-items
99 1
        match = {}
100 1
        for name in self.match.copy():
101 1
            match[name] = self.match[name].value
102 1
        return match
103
104 1
    def to_json(self):
105
        """Return a json version of the flow."""
106
        return json.dumps(self.to_dict())
107
108
    # @staticmethod
109
    # def from_dict(flow_dict):
110
    #     """Create a flow from a dict."""
111
    #     flow = GenericFlow()
112
    #     for attr_name, value in flow_dict.items():
113
    #         if attr_name == 'actions':
114
    #             flow.actions = []
115
    #             for action in value:
116
    #                 new_action = ACTION_TYPES[int(action['action_type'])]()
117
    #                 for action_attr_name,
118
    #                     action_attr_value in action.items():
119
    #
120
    #                     setattr(new_action,
121
    #                             action_attr_name,
122
    #                             action_attr_value)
123
    #                 flow.actions.append(new_action)
124
    #         else:
125
    #             setattr(flow, attr_name, value)
126
    #     return flow
127
128 1
    @classmethod
129 1
    def from_flow_stats(cls, flow_stats, version='0x01'):
130
        """Create a flow from OF flow stats."""
131 1
        flow = GenericFlow(version=version)
132 1
        flow.idle_timeout = flow_stats.idle_timeout.value
133 1
        flow.hard_timeout = flow_stats.hard_timeout.value
134 1
        flow.priority = flow_stats.priority.value
135 1
        flow.table_id = flow_stats.table_id.value
136 1
        flow.duration_sec = flow_stats.duration_sec.value
137 1
        flow.packet_count = flow_stats.packet_count.value
138 1
        flow.byte_count = flow_stats.byte_count.value
139 1
        if version == '0x01':
140 1
            flow.match['wildcards'] = flow_stats.match.wildcards.value
141 1
            flow.match['in_port'] = flow_stats.match.in_port.value
142 1
            flow.match['eth_src'] = flow_stats.match.dl_src.value
143 1
            flow.match['eth_dst'] = flow_stats.match.dl_dst.value
144 1
            flow.match['vlan_vid'] = flow_stats.match.dl_vlan.value
145 1
            flow.match['vlan_pcp'] = flow_stats.match.dl_vlan_pcp.value
146 1
            flow.match['eth_type'] = flow_stats.match.dl_type.value
147 1
            flow.match['ip_tos'] = flow_stats.match.nw_tos.value
148 1
            flow.match['ipv4_src'] = flow_stats.match.nw_src.value
149 1
            flow.match['ipv4_dst'] = flow_stats.match.nw_dst.value
150 1
            flow.match['ip_proto'] = flow_stats.match.nw_proto.value
151 1
            flow.match['tcp_src'] = flow_stats.match.tp_src.value
152 1
            flow.match['tcp_dst'] = flow_stats.match.tp_dst.value
153 1
            flow.actions = []
154 1
            for of_action in flow_stats.actions:
155 1
                action = Action10.from_of_action(of_action)
156 1
                flow.actions.append(action)
157 1
        elif version == '0x04':
158 1
            for match in flow_stats.match.oxm_match_fields:
159 1
                match_field = MatchFieldFactory.from_of_tlv(match)
160 1
                field_name = match_field.name
161 1
                if field_name == 'dl_vlan':
162 1
                    field_name = 'vlan_vid'
163 1
                flow.match[field_name] = match_field
164 1
            flow.actions = []
165 1
            for instruction in flow_stats.instructions:
166 1
                if instruction.instruction_type == \
167
                        InstructionType.OFPIT_APPLY_ACTIONS:
168 1
                    for of_action in instruction.actions:
169 1
                        action = Action13.from_of_action(of_action)
170 1
                        flow.actions.append(action)
171 1
        return flow
172
173 1
    def do_match(self, args):
174
        """Match a packet against this flow."""
175
        if self.version == '0x01':
176
            return self.match10(args)
177
        if self.version == '0x04':
178
            return self.match13(args)
179
        return None
180
181 1
    def match10(self, args):
182
        """Match a packet against this flow (OF1.0)."""
183
        log.debug('Matching packet')
184
        if not self.match['wildcards'] & FlowWildCards.OFPFW_IN_PORT:
185
            if 'in_port' not in args:
186
                return False
187
            if self.match['in_port'] != int(args['in_port']):
188
                return False
189
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN_PCP:
190
            if 'vlan_pcp' not in args:
191
                return False
192
            if self.match['vlan_pcp'] != int(args['vlan_pcp']):
193
                return False
194
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN:
195
            if 'vlan_vid' not in args:
196
                return False
197
            if self.match['vlan_vid'] != args['vlan_vid'][-1]:
198
                return False
199
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_SRC:
200
            if 'eth_src' not in args:
201
                return False
202
            if self.match['eth_src'] != args['eth_src']:
203
                return False
204
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_DST:
205
            if 'eth_dst' not in args:
206
                return False
207
            if self.match['eth_dst'] != args['eth_dst']:
208
                return False
209
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_TYPE:
210
            if 'eth_type' not in args:
211
                return False
212
            if self.match['eth_type'] != int(args['eth_type']):
213
                return False
214
        if self.match['eth_type'] == constants.IPV4:
215
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_src']))
216
            if flow_ip_int != 0:
217
                mask = ((self.match['wildcards'] &
218
                         FlowWildCards.OFPFW_NW_SRC_MASK) >>
219
                        FlowWildCards.OFPFW_NW_SRC_SHIFT)
220
                mask = min(mask, 32)
221
                if mask != 32 and 'ipv4_src' not in args:
222
                    return False
223
                mask = (0xffffffff << mask) & 0xffffffff
224
                ip_int = int(ipaddress.IPv4Address(args['ipv4_src']))
225
                if ip_int & mask != flow_ip_int & mask:
226
                    return False
227
228
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_dst']))
229
            if flow_ip_int != 0:
230
                mask = ((self.match['wildcards'] &
231
                         FlowWildCards.OFPFW_NW_DST_MASK) >>
232
                        FlowWildCards.OFPFW_NW_DST_SHIFT)
233
                mask = min(mask, 32)
234
                if mask != 32 and 'ipv4_dst' not in args:
235
                    return False
236
                mask = (0xffffffff << mask) & 0xffffffff
237
                ip_int = int(ipaddress.IPv4Address(args['ipv4_dst']))
238
                if ip_int & mask != flow_ip_int & mask:
239
                    return False
240
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_TOS:
241
                if 'ip_tos' not in args:
242
                    return False
243
                if self.match['ip_tos'] != int(args['ip_tos']):
244
                    return False
245
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_PROTO:
246
                if 'ip_proto' not in args:
247
                    return False
248
                if self.match['ip_proto'] != int(args['ip_proto']):
249
                    return False
250
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_SRC:
251
                if 'tp_src' not in args:
252
                    return False
253
                if self.match['tcp_src'] != int(args['tp_src']):
254
                    return False
255
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_DST:
256
                if 'tp_dst' not in args:
257
                    return False
258
                if self.match['tcp_dst'] != int(args['tp_dst']):
259
                    return False
260
        return self
261
262 1
    def match13(self, args):
263
        """Match a packet against this flow (OF1.3)."""
264
        # pylint: disable=consider-using-dict-items
265
        for name in self.match.copy():
266
            if name not in args:
267
                return False
268
            if name == 'vlan_vid':
269
                field = args[name][-1]
270
            else:
271
                field = args[name]
272
            if name not in ('ipv4_src', 'ipv4_dst', 'ipv6_src', 'ipv6_dst'):
273
                if self.match[name].value != field:
274
                    return False
275
            else:
276
                packet_ip = int(ipaddress.ip_address(field))
277
                ip_addr = self.match[name].value
278
                if packet_ip & ip_addr.netmask != ip_addr.address:
279
                    return False
280
        return self
281
282
283 1
class Main(KytosNApp):
284
    """Main class of amlight/flow_stats NApp.
285
286
    This class is the entry point for this napp.
287
    """
288
289 1
    def setup(self):
290
        """Replace the '__init__' method for the KytosNApp subclass.
291
292
        The setup method is automatically called by the controller when your
293
        application is loaded.
294
295
        So, if you have any setup routine, insert it here.
296
        """
297 1
        log.info('Starting Kytos/Amlight flow manager')
298 1
        for switch in self.controller.switches.copy().values():
299
            switch.generic_flows = []
300 1
        self.switch_stats_xid = {}
301 1
        self.switch_stats_lock = {}
302
303 1
    def execute(self):
304
        """This method is executed right after the setup method execution.
305
306
        You can also use this method in loop mode if you add to the above setup
307
        method a line like the following example:
308
309
            self.execute_as_loop(30)  # 30-second interval.
310
        """
311
312 1
    def shutdown(self):
313
        """This method is executed when your napp is unloaded.
314
315
        If you have some cleanup procedure, insert it here.
316
        """
317
318 1
    def flow_from_id(self, flow_id):
319
        """Flow from given flow_id."""
320 1
        for switch in self.controller.switches.copy().values():
321 1
            try:
322 1
                for flow in switch.generic_flows:
323 1
                    if flow.id == flow_id:
324 1
                        return flow
325
            except KeyError:
326
                pass
327 1
        return None
328
329 1
    @rest('flow/match/<dpid>')
330 1
    def flow_match(self, dpid):
331
        """Return first flow matching request."""
332 1
        switch = self.controller.get_switch_by_dpid(dpid)
333 1
        flow = self.match_flows(switch, format_request(request.args), False)
334 1
        if flow:
335 1
            return jsonify(flow.to_dict())
336 1
        return "No match", 404
337
338 1
    @rest('flow/stats/<dpid>')
339 1
    def flow_stats(self, dpid):
340
        """Return all flows matching request."""
341 1
        switch = self.controller.get_switch_by_dpid(dpid)
342 1
        if not switch:
343
            return f"switch {dpid} not found", 404
344 1
        flows = self.match_flows(switch, format_request(request.args), True)
345 1
        flows = [flow.to_dict() for flow in flows]
346 1
        return jsonify(flows)
347
348 1
    @staticmethod
349 1
    def match_flows(switch, args, many=True):
350
        # pylint: disable=bad-staticmethod-argument
351
        """
352
        Match the packet in request against the flows installed in the switch.
353
354
        Try the match with each flow, in other. If many is True, tries the
355
        match with all flows, if False, tries until the first match.
356
        :param args: packet data
357
        :param many: Boolean, indicating whether to continue after matching the
358
                first flow or not
359
        :return: If many, the list of matched flows, or the matched flow
360
        """
361
        response = []
362
        try:
363
            for flow in switch.generic_flows:
364
                match = flow.do_match(args)
365
                if match:
366
                    if many:
367
                        response.append(match)
368
                    else:
369
                        response = match
370
                        break
371
        except AttributeError:
372
            return None
373
        if not many and isinstance(response, list):
374
            return None
375
        return response
376
377 1
    @staticmethod
378 1
    def match_and_apply(switch, args):
379
        # pylint: disable=bad-staticmethod-argument
380
        """Match flows and apply actions.
381
382
        Match given packet (in args) against the switch flows and,
383
        if a match flow is found, apply its actions."""
384
        flow = Main.match_flows(switch, args, False)
385
        port = None
386
        actions = None
387
        # pylint: disable=too-many-nested-blocks
388
        if flow:
389
            actions = flow.actions
390
            if switch.ofp_version == '0x01':
391
                for action in actions:
392
                    action_type = action.action_type
393
                    if action_type == 'output':
394
                        port = action.port
395
                    elif action_type == 'set_vlan':
396
                        if 'vlan_vid' in args:
397
                            args['vlan_vid'][-1] = action.vlan_id
398
                        else:
399
                            args['vlan_vid'] = [action.vlan_id]
400
            elif switch.ofp_version == '0x04':
401
                for action in actions:
402
                    action_type = action.action_type
403
                    if action_type == 'output':
404
                        port = action.port
405
                    if action_type == 'push_vlan':
406
                        if 'vlan_vid' not in args:
407
                            args['vlan_vid'] = []
408
                        args['vlan_vid'].append(0)
409
                    if action_type == 'pop_vlan':
410
                        if 'vlan_vid' in args:
411
                            args['vlan_vid'].pop()
412
                            if len(args['vlan_vid']) == 0:
413
                                del args['vlan_vid']
414
                    if action_type == 'set_vlan':
415
                        args['vlan_vid'][-1] = action.vlan_id
416
        return flow, args, port
417
418 1
    @rest('packet_count/<flow_id>')
419 1
    def packet_count(self, flow_id):
420
        """Packet count of an specific flow."""
421 1
        flow = self.flow_from_id(flow_id)
422 1
        if flow is None:
423 1
            return "Flow does not exist", 404
424 1
        packet_stats = {
425
            'flow_id': flow_id,
426
            'packet_counter': flow.packet_count,
427
            'packet_per_second': flow.packet_count / flow.duration_sec
428
            }
429 1
        return jsonify(packet_stats)
430
431 1
    @rest('bytes_count/<flow_id>')
432 1
    def bytes_count(self, flow_id):
433
        """Bytes count of an specific flow."""
434 1
        flow = self.flow_from_id(flow_id)
435 1
        if flow is None:
436 1
            return "Flow does not exist", 404
437 1
        bytes_stats = {
438
            'flow_id': flow_id,
439
            'bytes_counter': flow.byte_count,
440
            'bits_per_second': flow.byte_count * 8 / flow.duration_sec
441
            }
442 1
        return jsonify(bytes_stats)
443
444 1
    @rest('packet_count/per_flow/<dpid>')
445 1
    def packet_count_per_flow(self, dpid):
446
        """Per flow packet count."""
447 1
        return self.flows_counters('packet_count',
448
                                   dpid,
449
                                   counter='packet_counter',
450
                                   rate='packet_per_second')
451
452 1
    @rest('packet_count/sum/<dpid>')
453 1
    def packet_count_sum(self, dpid):
454
        """Sum of packet count flow stats."""
455 1
        return self.flows_counters('packet_count',
456
                                   dpid,
457
                                   total=True)
458
459 1
    @rest('bytes_count/per_flow/<dpid>')
460 1
    def bytes_count_per_flow(self, dpid):
461
        """Per flow bytes count."""
462 1
        return self.flows_counters('byte_count',
463
                                   dpid,
464
                                   counter='bytes_counter',
465
                                   rate='bits_per_second')
466
467 1
    @rest('bytes_count/sum/<dpid>')
468 1
    def bytes_count_sum(self, dpid):
469
        """Sum of bytes count flow stats."""
470 1
        return self.flows_counters('byte_count',
471
                                   dpid,
472
                                   total=True)
473
474 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
475
                       total=False):
476
        """Calculate flows statistics.
477
478
        The returned statistics are both per flow and for the sum of flows
479
        """
480
        # pylint: disable=too-many-arguments
481
        # pylint: disable=unused-variable
482 1
        start_date = request.args.get('start_date', 0)
483 1
        end_date = request.args.get('end_date', 0)
484
        # pylint: enable=unused-variable
485
486 1
        if total:
487 1
            count_flows = 0
488
        else:
489 1
            count_flows = []
490 1
            if not counter:
491
                counter = field
492 1
            if not rate:
493
                rate = field
494
495
        # We don't have statistics persistence yet, so for now this only works
496
        # for start and end equals to zero
497 1
        flows = self.controller.get_switch_by_dpid(dpid).generic_flows
498
499 1
        for flow in flows:
500 1
            count = getattr(flow, field)
501 1
            if total:
502 1
                count_flows += count
503
            else:
504 1
                per_second = count / flow.duration_sec
505 1
                if rate.startswith('bits'):
506 1
                    per_second *= 8
507 1
                count_flows.append({'flow_id': flow.id,
508
                                    counter: count,
509
                                    rate: per_second})
510
511 1
        return jsonify(count_flows)
512
513 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
514 1
    def on_stats_reply_0x01(self, event):
515
        """Capture flow stats messages for v0x01 switches."""
516
        msg = event.content['message']
517
        if msg.body_type == common01.StatsType.OFPST_FLOW:
518
            switch = event.source.switch
519
            self.handle_stats_reply(msg, switch)
520
521 1
    def handle_stats_reply(self, msg, switch):
522
        """Insert flows received in the switch list of flows."""
523 1
        try:
524 1
            old_flows = switch.generic_flows
525
        except AttributeError:
526
            old_flows = []
527 1
        is_new_xid = (
528
            int(msg.header.xid) != self.switch_stats_xid.get(switch.id, 0)
529
        )
530 1
        is_last_part = msg.flags.value % 2 == 0
531 1
        self.switch_stats_lock.setdefault(switch.id, Lock())
532 1
        with self.switch_stats_lock[switch.id]:
533 1
            if is_new_xid:
534 1
                switch.generic_new_flows = []
535 1
                self.switch_stats_xid[switch.id] = int(msg.header.xid)
536 1
            for flow_stats in msg.body:
537 1
                flow = GenericFlow.from_flow_stats(flow_stats,
538
                                                   switch.ofp_version)
539 1
                switch.generic_new_flows.append(flow)
540 1
            if is_last_part:
541 1
                switch.generic_flows = switch.generic_new_flows
542 1
                switch.generic_flows.sort(
543
                    key=lambda f: (f.priority, f.duration_sec),
544
                    reverse=True
545
                )
546 1
        if is_new_xid and is_last_part and switch.generic_flows != old_flows:
547
            # Generate an event informing that flows have changed
548 1
            event = KytosEvent('amlight/flow_stats.flows_updated')
549 1
            event.content['switch'] = switch.dpid
550 1
            self.controller.buffers.app.put(event)
551
552 1
    @listen_to('kytos/of_core.flow_stats.received')
553 1
    def on_stats_received(self, event):
554
        """Capture flow stats messages for OpenFlow 1.3."""
555
        self.handle_stats_received(event)
556
557 1
    def handle_stats_received(self, event):
558
        """Handle flow stats messages for OpenFlow 1.3."""
559 1
        switch = event.content['switch']
560 1
        if 'replies_flows' in event.content:
561 1
            replies_flows = event.content['replies_flows']
562 1
            self.handle_stats_reply_received(switch, replies_flows)
563
564 1
    @classmethod
565 1
    def handle_stats_reply_received(cls, switch, replies_flows):
566
        """Iterate on the replies and set the generic flows"""
567
        flows = []
568
        for flow in replies_flows:
569
            flows.append(GenericFlow.from_flow_stats(flow, switch.ofp_version))
570
        switch.generic_flows.extend(flows)
571
        switch.generic_flows.sort(
572
            key=lambda f: (f.priority, f.duration_sec),
573
            reverse=True
574
        )
575