Passed
Push — master ( c58024...645373 )
by Vinicius
01:59 queued 17s
created

build.main.Main.on_stats_reply_0x01()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.037
1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
import hashlib
9 1
import ipaddress
10 1
import json
11 1
from threading import Lock
12
13 1
import pyof.v0x01.controller2switch.common as common01
14 1
import pyof.v0x04.controller2switch.common as common04
15 1
from flask import jsonify, request
16 1
from kytos.core import KytosEvent, KytosNApp, log, rest
17 1
from kytos.core.helpers import listen_to
18 1
from napps.amlight.flow_stats.utils import format_request
19 1
from napps.amlight.sdntrace import constants
20 1
from napps.kytos.of_core.v0x01.flow import Action as Action10
21 1
from napps.kytos.of_core.v0x04.flow import Action as Action13
22 1
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
23 1
from pyof.v0x01.common.flow_match import FlowWildCards
24 1
from pyof.v0x04.common.flow_instructions import InstructionType
25
26
27 1
class GenericFlow():
28
    """Class to represent a flow.
29
30
        This class represents a flow regardless of the OF version."""
31
32 1
    def __init__(self, version='0x01', match=None, idle_timeout=0,
33
                 hard_timeout=0, duration_sec=0, packet_count=0, byte_count=0,
34
                 priority=0, table_id=0xff, cookie=None, buffer_id=None,
35
                 actions=None):
36 1
        self.version = version
37 1
        self.match = match if match else {}
38 1
        self.idle_timeout = idle_timeout
39 1
        self.hard_timeout = hard_timeout
40 1
        self.duration_sec = duration_sec
41 1
        self.packet_count = packet_count
42 1
        self.byte_count = byte_count
43 1
        self.priority = priority
44 1
        self.table_id = table_id
45 1
        self.cookie = cookie
46 1
        self.buffer_id = buffer_id
47 1
        self.actions = actions if actions else []
48
49 1
    def __eq__(self, other):
50
        return self.id == other.id
51
52 1
    @property
53 1
    def id(self):
54
        # pylint: disable=invalid-name
55
        """Return the hash of the object.
56
        Calculates the hash of the object by using the hashlib we use md5 of
57
        strings.
58
        Returns:
59
            string: Hash of object.
60
        """
61 1
        hash_result = hashlib.md5()
62 1
        hash_result.update(str(self.version).encode('utf-8'))
63 1
        for value in self.match.values():
64 1
            if self.version == '0x01':
65 1
                hash_result.update(str(value).encode('utf-8'))
66
            else:
67 1
                hash_result.update(str(value.value).encode('utf-8'))
68 1
        hash_result.update(str(self.idle_timeout).encode('utf-8'))
69 1
        hash_result.update(str(self.hard_timeout).encode('utf-8'))
70 1
        hash_result.update(str(self.priority).encode('utf-8'))
71 1
        hash_result.update(str(self.table_id).encode('utf-8'))
72 1
        hash_result.update(str(self.cookie).encode('utf-8'))
73 1
        hash_result.update(str(self.buffer_id).encode('utf-8'))
74
75 1
        return hash_result.hexdigest()
76
77 1
    def to_dict(self):
78
        """Convert flow to a dictionary."""
79 1
        flow_dict = {}
80 1
        flow_dict['version'] = self.version
81 1
        if self.version == '0x01':
82 1
            flow_dict.update(self.match)
83
        else:
84 1
            flow_dict.update(self.match_to_dict())
85 1
        flow_dict['idle_timeout'] = self.idle_timeout
86 1
        flow_dict['hard_timeout'] = self.hard_timeout
87 1
        flow_dict['priority'] = self.priority
88 1
        flow_dict['table_id'] = self.table_id
89 1
        flow_dict['cookie'] = self.cookie
90 1
        flow_dict['buffer_id'] = self.buffer_id
91 1
        flow_dict['actions'] = []
92 1
        for action in self.actions:
93 1
            flow_dict['actions'].append(action.as_dict())
94
95 1
        return flow_dict
96
97 1
    def match_to_dict(self):
98
        """Convert a match in OF 1.3 to a dictionary."""
99
        # pylint: disable=consider-using-dict-items
100 1
        match = {}
101 1
        for name in self.match:
102 1
            match[name] = self.match[name].value
103 1
        return match
104
105 1
    def to_json(self):
106
        """Return a json version of the flow."""
107
        return json.dumps(self.to_dict())
108
109
    # @staticmethod
110
    # def from_dict(flow_dict):
111
    #     """Create a flow from a dict."""
112
    #     flow = GenericFlow()
113
    #     for attr_name, value in flow_dict.items():
114
    #         if attr_name == 'actions':
115
    #             flow.actions = []
116
    #             for action in value:
117
    #                 new_action = ACTION_TYPES[int(action['action_type'])]()
118
    #                 for action_attr_name,
119
    #                     action_attr_value in action.items():
120
    #
121
    #                     setattr(new_action,
122
    #                             action_attr_name,
123
    #                             action_attr_value)
124
    #                 flow.actions.append(new_action)
125
    #         else:
126
    #             setattr(flow, attr_name, value)
127
    #     return flow
128
129 1
    @classmethod
130 1
    def from_flow_stats(cls, flow_stats, version='0x01'):
131
        """Create a flow from OF flow stats."""
132 1
        flow = GenericFlow(version=version)
133 1
        flow.idle_timeout = flow_stats.idle_timeout.value
134 1
        flow.hard_timeout = flow_stats.hard_timeout.value
135 1
        flow.priority = flow_stats.priority.value
136 1
        flow.table_id = flow_stats.table_id.value
137 1
        flow.duration_sec = flow_stats.duration_sec.value
138 1
        flow.packet_count = flow_stats.packet_count.value
139 1
        flow.byte_count = flow_stats.byte_count.value
140 1
        if version == '0x01':
141 1
            flow.match['wildcards'] = flow_stats.match.wildcards.value
142 1
            flow.match['in_port'] = flow_stats.match.in_port.value
143 1
            flow.match['eth_src'] = flow_stats.match.dl_src.value
144 1
            flow.match['eth_dst'] = flow_stats.match.dl_dst.value
145 1
            flow.match['vlan_vid'] = flow_stats.match.dl_vlan.value
146 1
            flow.match['vlan_pcp'] = flow_stats.match.dl_vlan_pcp.value
147 1
            flow.match['eth_type'] = flow_stats.match.dl_type.value
148 1
            flow.match['ip_tos'] = flow_stats.match.nw_tos.value
149 1
            flow.match['ipv4_src'] = flow_stats.match.nw_src.value
150 1
            flow.match['ipv4_dst'] = flow_stats.match.nw_dst.value
151 1
            flow.match['ip_proto'] = flow_stats.match.nw_proto.value
152 1
            flow.match['tcp_src'] = flow_stats.match.tp_src.value
153 1
            flow.match['tcp_dst'] = flow_stats.match.tp_dst.value
154 1
            flow.actions = []
155 1
            for of_action in flow_stats.actions:
156 1
                action = Action10.from_of_action(of_action)
157 1
                flow.actions.append(action)
158 1
        elif version == '0x04':
159 1
            for match in flow_stats.match.oxm_match_fields:
160 1
                match_field = MatchFieldFactory.from_of_tlv(match)
161 1
                field_name = match_field.name
162 1
                if field_name == 'dl_vlan':
163 1
                    field_name = 'vlan_vid'
164 1
                flow.match[field_name] = match_field
165 1
            flow.actions = []
166 1
            for instruction in flow_stats.instructions:
167 1
                if instruction.instruction_type == \
168
                        InstructionType.OFPIT_APPLY_ACTIONS:
169 1
                    for of_action in instruction.actions:
170 1
                        action = Action13.from_of_action(of_action)
171 1
                        flow.actions.append(action)
172 1
        return flow
173
174 1
    def do_match(self, args):
175
        """Match a packet against this flow."""
176
        if self.version == '0x01':
177
            return self.match10(args)
178
        if self.version == '0x04':
179
            return self.match13(args)
180
        return None
181
182 1
    def match10(self, args):
183
        """Match a packet against this flow (OF1.0)."""
184
        log.debug('Matching packet')
185
        if not self.match['wildcards'] & FlowWildCards.OFPFW_IN_PORT:
186
            if 'in_port' not in args:
187
                return False
188
            if self.match['in_port'] != int(args['in_port']):
189
                return False
190
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN_PCP:
191
            if 'vlan_pcp' not in args:
192
                return False
193
            if self.match['vlan_pcp'] != int(args['vlan_pcp']):
194
                return False
195
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN:
196
            if 'vlan_vid' not in args:
197
                return False
198
            if self.match['vlan_vid'] != args['vlan_vid'][-1]:
199
                return False
200
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_SRC:
201
            if 'eth_src' not in args:
202
                return False
203
            if self.match['eth_src'] != args['eth_src']:
204
                return False
205
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_DST:
206
            if 'eth_dst' not in args:
207
                return False
208
            if self.match['eth_dst'] != args['eth_dst']:
209
                return False
210
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_TYPE:
211
            if 'eth_type' not in args:
212
                return False
213
            if self.match['eth_type'] != int(args['eth_type']):
214
                return False
215
        if self.match['eth_type'] == constants.IPV4:
216
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_src']))
217
            if flow_ip_int != 0:
218
                mask = ((self.match['wildcards'] &
219
                         FlowWildCards.OFPFW_NW_SRC_MASK) >>
220
                        FlowWildCards.OFPFW_NW_SRC_SHIFT)
221
                mask = min(mask, 32)
222
                if mask != 32 and 'ipv4_src' not in args:
223
                    return False
224
                mask = (0xffffffff << mask) & 0xffffffff
225
                ip_int = int(ipaddress.IPv4Address(args['ipv4_src']))
226
                if ip_int & mask != flow_ip_int & mask:
227
                    return False
228
229
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_dst']))
230
            if flow_ip_int != 0:
231
                mask = ((self.match['wildcards'] &
232
                         FlowWildCards.OFPFW_NW_DST_MASK) >>
233
                        FlowWildCards.OFPFW_NW_DST_SHIFT)
234
                mask = min(mask, 32)
235
                if mask != 32 and 'ipv4_dst' not in args:
236
                    return False
237
                mask = (0xffffffff << mask) & 0xffffffff
238
                ip_int = int(ipaddress.IPv4Address(args['ipv4_dst']))
239
                if ip_int & mask != flow_ip_int & mask:
240
                    return False
241
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_TOS:
242
                if 'ip_tos' not in args:
243
                    return False
244
                if self.match['ip_tos'] != int(args['ip_tos']):
245
                    return False
246
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_PROTO:
247
                if 'ip_proto' not in args:
248
                    return False
249
                if self.match['ip_proto'] != int(args['ip_proto']):
250
                    return False
251
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_SRC:
252
                if 'tp_src' not in args:
253
                    return False
254
                if self.match['tcp_src'] != int(args['tp_src']):
255
                    return False
256
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_DST:
257
                if 'tp_dst' not in args:
258
                    return False
259
                if self.match['tcp_dst'] != int(args['tp_dst']):
260
                    return False
261
        return self
262
263 1
    def match13(self, args):
264
        """Match a packet against this flow (OF1.3)."""
265
        # pylint: disable=consider-using-dict-items
266
        for name in self.match:
267
            if name not in args:
268
                return False
269
            if name == 'vlan_vid':
270
                field = args[name][-1]
271
            else:
272
                field = args[name]
273
            if name not in ('ipv4_src', 'ipv4_dst', 'ipv6_src', 'ipv6_dst'):
274
                if self.match[name].value != field:
275
                    return False
276
            else:
277
                packet_ip = int(ipaddress.ip_address(field))
278
                ip_addr = self.match[name].value
279
                if packet_ip & ip_addr.netmask != ip_addr.address:
280
                    return False
281
        return self
282
283
284 1
class Main(KytosNApp):
285
    """Main class of amlight/flow_stats NApp.
286
287
    This class is the entry point for this napp.
288
    """
289
290 1
    def setup(self):
291
        """Replace the '__init__' method for the KytosNApp subclass.
292
293
        The setup method is automatically called by the controller when your
294
        application is loaded.
295
296
        So, if you have any setup routine, insert it here.
297
        """
298 1
        log.info('Starting Kytos/Amlight flow manager')
299 1
        for switch in self.controller.switches.values():
300
            switch.generic_flows = []
301 1
        self.switch_stats_xid = {}
302 1
        self.switch_stats_lock = {}
303
304 1
    def execute(self):
305
        """This method is executed right after the setup method execution.
306
307
        You can also use this method in loop mode if you add to the above setup
308
        method a line like the following example:
309
310
            self.execute_as_loop(30)  # 30-second interval.
311
        """
312
313 1
    def shutdown(self):
314
        """This method is executed when your napp is unloaded.
315
316
        If you have some cleanup procedure, insert it here.
317
        """
318
319 1
    def flow_from_id(self, flow_id):
320
        """Flow from given flow_id."""
321 1
        for switch in self.controller.switches.values():
322 1
            try:
323 1
                for flow in switch.generic_flows:
324 1
                    if flow.id == flow_id:
325 1
                        return flow
326
            except KeyError:
327
                pass
328 1
        return None
329
330 1
    @rest('flow/match/<dpid>')
331 1
    def flow_match(self, dpid):
332
        """Return first flow matching request."""
333 1
        switch = self.controller.get_switch_by_dpid(dpid)
334 1
        flow = self.match_flows(switch, format_request(request.args), False)
335 1
        if flow:
336 1
            return jsonify(flow.to_dict())
337 1
        return "No match", 404
338
339 1
    @rest('flow/stats/<dpid>')
340 1
    def flow_stats(self, dpid):
341
        """Return all flows matching request."""
342 1
        switch = self.controller.get_switch_by_dpid(dpid)
343 1
        if not switch:
344
            return f"switch {dpid} not found", 404
345 1
        flows = self.match_flows(switch, format_request(request.args), True)
346 1
        flows = [flow.to_dict() for flow in flows]
347 1
        return jsonify(flows)
348
349 1
    @staticmethod
350 1
    def match_flows(switch, args, many=True):
351
        # pylint: disable=bad-staticmethod-argument
352
        """
353
        Match the packet in request against the flows installed in the switch.
354
355
        Try the match with each flow, in other. If many is True, tries the
356
        match with all flows, if False, tries until the first match.
357
        :param args: packet data
358
        :param many: Boolean, indicating whether to continue after matching the
359
                first flow or not
360
        :return: If many, the list of matched flows, or the matched flow
361
        """
362
        response = []
363
        try:
364
            for flow in switch.generic_flows:
365
                match = flow.do_match(args)
366
                if match:
367
                    if many:
368
                        response.append(match)
369
                    else:
370
                        response = match
371
                        break
372
        except AttributeError:
373
            return None
374
        if not many and isinstance(response, list):
375
            return None
376
        return response
377
378 1
    @staticmethod
379 1
    def match_and_apply(switch, args):
380
        # pylint: disable=bad-staticmethod-argument
381
        """Match flows and apply actions.
382
383
        Match given packet (in args) against the switch flows and,
384
        if a match flow is found, apply its actions."""
385
        flow = Main.match_flows(switch, args, False)
386
        port = None
387
        actions = None
388
        # pylint: disable=too-many-nested-blocks
389
        if flow:
390
            actions = flow.actions
391
            if switch.ofp_version == '0x01':
392
                for action in actions:
393
                    action_type = action.action_type
394
                    if action_type == 'output':
395
                        port = action.port
396
                    elif action_type == 'set_vlan':
397
                        if 'vlan_vid' in args:
398
                            args['vlan_vid'][-1] = action.vlan_id
399
                        else:
400
                            args['vlan_vid'] = [action.vlan_id]
401
            elif switch.ofp_version == '0x04':
402
                for action in actions:
403
                    action_type = action.action_type
404
                    if action_type == 'output':
405
                        port = action.port
406
                    if action_type == 'push_vlan':
407
                        if 'vlan_vid' not in args:
408
                            args['vlan_vid'] = []
409
                        args['vlan_vid'].append(0)
410
                    if action_type == 'pop_vlan':
411
                        if 'vlan_vid' in args:
412
                            args['vlan_vid'].pop()
413
                            if len(args['vlan_vid']) == 0:
414
                                del args['vlan_vid']
415
                    if action_type == 'set_vlan':
416
                        args['vlan_vid'][-1] = action.vlan_id
417
        return flow, args, port
418
419 1
    @rest('packet_count/<flow_id>')
420 1
    def packet_count(self, flow_id):
421
        """Packet count of an specific flow."""
422 1
        flow = self.flow_from_id(flow_id)
423 1
        if flow is None:
424 1
            return "Flow does not exist", 404
425 1
        packet_stats = {
426
            'flow_id': flow_id,
427
            'packet_counter': flow.packet_count,
428
            'packet_per_second': flow.packet_count / flow.duration_sec
429
            }
430 1
        return jsonify(packet_stats)
431
432 1
    @rest('bytes_count/<flow_id>')
433 1
    def bytes_count(self, flow_id):
434
        """Bytes count of an specific flow."""
435 1
        flow = self.flow_from_id(flow_id)
436 1
        if flow is None:
437 1
            return "Flow does not exist", 404
438 1
        bytes_stats = {
439
            'flow_id': flow_id,
440
            'bytes_counter': flow.byte_count,
441
            'bits_per_second': flow.byte_count * 8 / flow.duration_sec
442
            }
443 1
        return jsonify(bytes_stats)
444
445 1
    @rest('packet_count/per_flow/<dpid>')
446 1
    def packet_count_per_flow(self, dpid):
447
        """Per flow packet count."""
448 1
        return self.flows_counters('packet_count',
449
                                   dpid,
450
                                   counter='packet_counter',
451
                                   rate='packet_per_second')
452
453 1
    @rest('packet_count/sum/<dpid>')
454 1
    def packet_count_sum(self, dpid):
455
        """Sum of packet count flow stats."""
456 1
        return self.flows_counters('packet_count',
457
                                   dpid,
458
                                   total=True)
459
460 1
    @rest('bytes_count/per_flow/<dpid>')
461 1
    def bytes_count_per_flow(self, dpid):
462
        """Per flow bytes count."""
463 1
        return self.flows_counters('byte_count',
464
                                   dpid,
465
                                   counter='bytes_counter',
466
                                   rate='bits_per_second')
467
468 1
    @rest('bytes_count/sum/<dpid>')
469 1
    def bytes_count_sum(self, dpid):
470
        """Sum of bytes count flow stats."""
471 1
        return self.flows_counters('byte_count',
472
                                   dpid,
473
                                   total=True)
474
475 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
476
                       total=False):
477
        """Calculate flows statistics.
478
479
        The returned statistics are both per flow and for the sum of flows
480
        """
481
        # pylint: disable=too-many-arguments
482
        # pylint: disable=unused-variable
483 1
        start_date = request.args.get('start_date', 0)
484 1
        end_date = request.args.get('end_date', 0)
485
        # pylint: enable=unused-variable
486
487 1
        if total:
488 1
            count_flows = 0
489
        else:
490 1
            count_flows = []
491 1
            if not counter:
492
                counter = field
493 1
            if not rate:
494
                rate = field
495
496
        # We don't have statistics persistence yet, so for now this only works
497
        # for start and end equals to zero
498 1
        flows = self.controller.get_switch_by_dpid(dpid).generic_flows
499
500 1
        for flow in flows:
501 1
            count = getattr(flow, field)
502 1
            if total:
503 1
                count_flows += count
504
            else:
505 1
                per_second = count / flow.duration_sec
506 1
                if rate.startswith('bits'):
507 1
                    per_second *= 8
508 1
                count_flows.append({'flow_id': flow.id,
509
                                    counter: count,
510
                                    rate: per_second})
511
512 1
        return jsonify(count_flows)
513
514 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
515 1
    def on_stats_reply_0x01(self, event):
516
        """Capture flow stats messages for v0x01 switches."""
517
        self.handle_stats_reply_0x01(event)
518
519 1
    def handle_stats_reply_0x01(self, event):
520
        """Handle stats replies for v0x01 switches."""
521 1
        msg = event.content['message']
522 1
        if msg.body_type == common01.StatsType.OFPST_FLOW:
523 1
            switch = event.source.switch
524 1
            self.handle_stats_reply(msg, switch)
525
526 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
527 1
    def on_stats_reply_0x04(self, event):
528
        """Capture flow stats messages for OpenFlow 1.3."""
529
        self.handle_stats_reply_0x04(event)
530
531 1
    def handle_stats_reply_0x04(self, event):
532
        """Handle flow stats messages for OpenFlow 1.3."""
533 1
        msg = event.content['message']
534 1
        if msg.multipart_type == common04.MultipartType.OFPMP_FLOW:
535 1
            switch = event.source.switch
536 1
            self.handle_stats_reply(msg, switch)
537
538 1
    def handle_stats_reply(self, msg, switch):
539
        """Insert flows received in the switch list of flows."""
540 1
        try:
541 1
            old_flows = switch.generic_flows
542
        except AttributeError:
543
            old_flows = []
544 1
        is_new_xid = (
545
            int(msg.header.xid) != self.switch_stats_xid.get(switch.id, 0)
546
        )
547 1
        is_last_part = msg.flags.value % 2 == 0
548 1
        self.switch_stats_lock.setdefault(switch.id, Lock())
549 1
        with self.switch_stats_lock[switch.id]:
550 1
            if is_new_xid:
551 1
                switch.generic_new_flows = []
552 1
                self.switch_stats_xid[switch.id] = int(msg.header.xid)
553 1
            for flow_stats in msg.body:
554 1
                flow = GenericFlow.from_flow_stats(flow_stats,
555
                                                   switch.ofp_version)
556 1
                switch.generic_new_flows.append(flow)
557 1
            if is_last_part:
558 1
                switch.generic_flows = switch.generic_new_flows
559 1
                switch.generic_flows.sort(
560
                    key=lambda f: (f.priority, f.duration_sec),
561
                    reverse=True
562
                )
563 1
        if is_new_xid and is_last_part and switch.generic_flows != old_flows:
564
            # Generate an event informing that flows have changed
565 1
            event = KytosEvent('amlight/flow_stats.flows_updated')
566 1
            event.content['switch'] = switch.dpid
567
            self.controller.buffers.app.put(event)
568