Passed
Pull Request — master (#69)
by Humberto
01:53
created

FlowSerializer13._create_vlan_tlv()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
ccs 0
cts 6
cp 0
cc 1
nop 1
crap 2
1
"""Flow serializer for OF 1.3."""
2
from itertools import chain
3
4
from pyof.foundation.basic_types import HWAddress, IPAddress
5
from pyof.foundation.network_types import EtherType
6
from pyof.v0x04.common.action import (ActionOutput, ActionPopVLAN, ActionPush,
7
                                      ActionSetField, ActionType)
8
from pyof.v0x04.common.flow_instructions import InstructionApplyAction
9
from pyof.v0x04.common.flow_instructions import InstructionType as IType
10
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
11
from pyof.v0x04.common.port import PortNo
12
from pyof.v0x04.controller2switch.flow_mod import FlowMod
13
14
from napps.kytos.flow_manager.serializers.base import FlowSerializer
15
16
17
class FlowSerializer13(FlowSerializer):
18
    """Flow serializer for OpenFlow 1.3."""
19
20
    def __init__(self):
21
        """Initialize OF 1.3 specific variables."""
22
        super().__init__()
23
        self._match_names = {
24
            'in_port': OxmOfbMatchField.OFPXMT_OFB_IN_PORT,
25
            'dl_src': OxmOfbMatchField.OFPXMT_OFB_ETH_SRC,
26
            'dl_dst': OxmOfbMatchField.OFPXMT_OFB_ETH_DST,
27
            'dl_type': OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE,
28
            'dl_vlan': OxmOfbMatchField.OFPXMT_OFB_VLAN_VID,
29
            'dl_vlan_pcp': OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP,
30
            'nw_src': OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC,
31
            'nw_dst': OxmOfbMatchField.OFPXMT_OFB_IPV4_DST,
32
            'nw_proto': OxmOfbMatchField.OFPXMT_OFB_IP_PROTO}
33
        # Invert match_values index
34
        self._match_values = {b: a for a, b in self._match_names.items()}
35
36
    def from_dict(self, dictionary):
37
        """Return an OF 1.0 FlowMod message from serialized dictionary."""
38
        flow_mod = FlowMod()
39
        instruction = InstructionApplyAction()
40
        flow_mod.instructions.append(instruction)
41
42
        for field, data in dictionary.items():
43
            if field in self.flow_attributes:
44
                setattr(flow_mod, field, data)
45
            elif field == 'match':
46
                tlvs = self._match_from_dict(data)
47
                flow_mod.match.oxm_match_fields.append(list(tlvs))
48
            elif field == 'actions':
49
                actions = self._actions_from_list(data)
50
                instruction.actions.extend(list(actions))
51
52
        return flow_mod
53
54
    def _match_from_dict(self, dictionary):
55
        known_fields = ((field, data) for field, data in dictionary.items()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable field does not seem to be defined.
Loading history...
56
                        if field in self._match_names)
57
        for field_name, data in known_fields:
58
            tlv = OxmTLV()
59
            tlv.oxm_field = self._match_names[field_name]
60
            # set oxm_value
61
            if field_name in ('dl_vlan_pcp', 'nw_proto'):
62
                tlv.oxm_value = data.to_bytes(1, 'big')
63
            elif field_name == 'dl_vlan':
64
                vid = data | VlanId.OFPVID_PRESENT
65
                tlv.oxm_value = vid.to_bytes(2, 'big')
66
            elif field_name in ('dl_src', 'dl_dst'):
67
                tlv.oxm_value = HWAddress(data).pack()
68
            elif field_name in ('nw_src', 'nw_dst'):
69
                tlv.oxm_value = IPAddress(data).pack()
70
            elif field_name == 'in_port':
71
                tlv.oxm_value = data.to_bytes(4, 'big')
72
            else:
73
                tlv.oxm_value = data.to_bytes(2, 'big')
74
            yield tlv
75
76
    @classmethod
77
    def _actions_from_list(cls, action_list):
78
        for action in action_list:
79
            new_action = cls._action_from_dict(action)
80
            if new_action:
81
                yield new_action
82
83
    @classmethod
84
    def _action_from_dict(cls, action):
85
        if action['action_type'] == 'set_vlan':
86
            tlv = cls._create_vlan_tlv(vlan_id=action['vlan_id'])
87
            return ActionSetField(field=tlv)
88
        elif action['action_type'] == 'output':
89
            if action['port'] == 'controller':
90
                return ActionOutput(port=PortNo.OFPP_CONTROLLER)
91
            return ActionOutput(port=action['port'])
92
        elif action['action_type'] == 'push_vlan':
93
            if action['tag_type'] == 's':
94
                ethertype = EtherType.VLAN_QINQ
95
            else:
96
                ethertype = EtherType.VLAN
97
            return ActionPush(action_type=ActionType.OFPAT_PUSH_VLAN,
98
                              ethertype=ethertype)
99
        elif action['action_type'] == 'pop_vlan':
100
            return ActionPopVLAN()
101
102
    @staticmethod
103
    def _create_vlan_tlv(vlan_id):
104
        tlv = OxmTLV()
105
        tlv.oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
106
        oxm_value = vlan_id | VlanId.OFPVID_PRESENT
107
        tlv.oxm_value = oxm_value.to_bytes(2, 'big')
108
        return tlv
109
110
    def to_dict(self, flow_stats):
111
        """Return a dictionary created from input 1.3 switch's flows."""
112
        flow_dict = {field: data.value
113
                     for field, data in vars(flow_stats).items()
114
                     if field in self.flow_attributes}
115
        flow_dict['match'] = self._match_to_dict(flow_stats)
116
        flow_dict['actions'] = self._actions_to_list(flow_stats)
117
        return flow_dict
118
119
    def _match_to_dict(self, flow_stats):
120
        match_dict = {}
121
        fields = (field for field in flow_stats.match.oxm_match_fields
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable field does not seem to be defined.
Loading history...
122
                  if field.oxm_field in self._match_values)
123
        for field in fields:
124
            match_field = self._match_values[field.oxm_field]
125
            if match_field == 'dl_vlan':
126
                data = int.from_bytes(field.oxm_value, 'big') & 4095
127
            elif match_field in ('dl_src', 'dl_dst'):
128
                addr = HWAddress()
129
                addr.unpack(field.oxm_value)
130
                data = str(addr)
131
            elif match_field in ('nw_src', 'nw_dst'):
132
                addr = IPAddress()
133
                addr.unpack(field.oxm_value)
134
                data = str(addr)
135
            else:
136
                data = int.from_bytes(field.oxm_value, 'big')
137
            match_dict[match_field] = data
138
        return match_dict
139
140
    def _actions_to_list(self, flow_stats):
141
        actions_list = []
142
        for action in self._filter_actions(flow_stats):
143
            action_dict = self._action_to_dict(action)
144
            actions_list.append(action_dict)
145
        return actions_list
146
147
    @staticmethod
148
    def _action_to_dict(action):
149
        if action.action_type == ActionType.OFPAT_SET_FIELD:
150
            if action.field.oxm_field == OxmOfbMatchField.OFPXMT_OFB_VLAN_VID:
151
                data = int.from_bytes(action.field.oxm_value, 'big') & 4095
152
                return {'action_type': 'set_vlan', 'vlan_id': data}
153
        elif action.action_type == ActionType.OFPAT_OUTPUT:
154
            if action.port == PortNo.OFPP_CONTROLLER:
155
                return {'action_type': 'output', 'port': 'controller'}
156
            return {'action_type': 'output', 'port': action.port.value}
157
        elif action.action_type == ActionType.OFPAT_PUSH_VLAN:
158
            if action.ethertype == EtherType.VLAN_QINQ:
159
                return {'action_type': 'push_vlan', 'tag_type': 's'}
160
            return {'action_type': 'push_vlan', 'tag_type': 'c'}
161
        elif action.action_type == ActionType.OFPAT_POP_VLAN:
162
            return {'action_type': 'pop_vlan'}
163
        return {}
164
165
    @staticmethod
166
    def _filter_actions(flow_stats):
167
        """Filter instructions and return a list of their actions."""
168
        instructions = (inst for inst in flow_stats.instructions
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable inst does not seem to be defined.
Loading history...
169
                        if inst.instruction_type == IType.OFPIT_APPLY_ACTIONS)
170
        action_lists = (instruction.actions for instruction in instructions)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable instruction does not seem to be defined.
Loading history...
171
        # Make a list from a list of lists
172
        return chain.from_iterable(action_lists)
173