FlowSerializer13._action_from_dict()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 18
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 7

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 18
ccs 15
cts 15
cp 1
rs 8
c 0
b 0
f 0
cc 7
nop 2
crap 7
1
"""Flow serializer for OF 1.3."""
2 1
from itertools import chain
3
4 1
from pyof.foundation.basic_types import HWAddress, IPAddress
5 1
from pyof.foundation.network_types import EtherType
6 1
from pyof.v0x04.common.action import (ActionOutput, ActionPopVLAN, ActionPush,
7
                                      ActionSetField, ActionType)
8 1
from pyof.v0x04.common.flow_instructions import InstructionApplyAction
9 1
from pyof.v0x04.common.flow_instructions import InstructionType as IType
10 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
11 1
from pyof.v0x04.common.port import PortNo
12 1
from pyof.v0x04.controller2switch.flow_mod import FlowMod
13
14 1
from napps.kytos.flow_manager.serializers.base import FlowSerializer
15
16
17
# pylint: disable=too-many-return-statements, inconsistent-return-statements
18 1
class FlowSerializer13(FlowSerializer):
19
    """Flow serializer for OpenFlow 1.3."""
20
21 1
    def __init__(self):
22
        """Initialize OF 1.3 specific variables."""
23 1
        super().__init__()
24 1
        self._match_names = {
25
            'in_port': OxmOfbMatchField.OFPXMT_OFB_IN_PORT,
26
            'dl_src': OxmOfbMatchField.OFPXMT_OFB_ETH_SRC,
27
            'dl_dst': OxmOfbMatchField.OFPXMT_OFB_ETH_DST,
28
            'dl_type': OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE,
29
            'dl_vlan': OxmOfbMatchField.OFPXMT_OFB_VLAN_VID,
30
            'dl_vlan_pcp': OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP,
31
            'nw_src': OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC,
32
            'nw_dst': OxmOfbMatchField.OFPXMT_OFB_IPV4_DST,
33
            'nw_proto': OxmOfbMatchField.OFPXMT_OFB_IP_PROTO}
34
        # Invert match_values index
35 1
        self._match_values = {b: a for a, b in self._match_names.items()}
36
37 1
    def from_dict(self, dictionary):
38
        """Return an OF 1.0 FlowMod message from serialized dictionary."""
39 1
        flow_mod = FlowMod()
40 1
        instruction = InstructionApplyAction()
41 1
        flow_mod.instructions.append(instruction)
42
43 1
        for field, data in dictionary.items():
44 1
            if field in self.flow_attributes:
45 1
                setattr(flow_mod, field, data)
46 1
            elif field == 'match':
47 1
                tlvs = self._match_from_dict(data)
48 1
                flow_mod.match.oxm_match_fields.append(list(tlvs))
49 1
            elif field == 'actions':
50 1
                actions = self._actions_from_list(data)
51 1
                instruction.actions.extend(list(actions))
52
53 1
        return flow_mod
54
55 1
    def _match_from_dict(self, dictionary):
56 1
        known_fields = ((field, data) for field, data in dictionary.items()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable field does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
57
                        if field in self._match_names)
58 1
        for field_name, data in known_fields:
59 1
            tlv = OxmTLV()
60 1
            tlv.oxm_field = self._match_names[field_name]
61
            # set oxm_value
62 1
            if field_name in ('dl_vlan_pcp', 'nw_proto'):
63 1
                tlv.oxm_value = data.to_bytes(1, 'big')
64 1
            elif field_name == 'dl_vlan':
65 1
                vid = data | VlanId.OFPVID_PRESENT
66 1
                tlv.oxm_value = vid.to_bytes(2, 'big')
67 1
            elif field_name in ('dl_src', 'dl_dst'):
68 1
                tlv.oxm_value = HWAddress(data).pack()
69 1
            elif field_name in ('nw_src', 'nw_dst'):
70 1
                tlv.oxm_value = IPAddress(data).pack()
71 1
            elif field_name == 'in_port':
72 1
                tlv.oxm_value = data.to_bytes(4, 'big')
73
            else:
74 1
                tlv.oxm_value = data.to_bytes(2, 'big')
75 1
            yield tlv
76
77 1
    @classmethod
78
    def _actions_from_list(cls, action_list):
79 1
        for action in action_list:
80 1
            new_action = cls._action_from_dict(action)
81 1
            if new_action:
82 1
                yield new_action
83
84 1
    @classmethod
85
    def _action_from_dict(cls, action):
86 1
        if action['action_type'] == 'set_vlan':
87 1
            tlv = cls._create_vlan_tlv(vlan_id=action['vlan_id'])
88 1
            return ActionSetField(field=tlv)
89 1
        elif action['action_type'] == 'output':
90 1
            if action['port'] == 'controller':
91 1
                return ActionOutput(port=PortNo.OFPP_CONTROLLER)
92 1
            return ActionOutput(port=action['port'])
93 1
        elif action['action_type'] == 'push_vlan':
94 1
            if action['tag_type'] == 's':
95 1
                ethertype = EtherType.VLAN_QINQ
96
            else:
97 1
                ethertype = EtherType.VLAN
98 1
            return ActionPush(action_type=ActionType.OFPAT_PUSH_VLAN,
99
                              ethertype=ethertype)
100 1
        elif action['action_type'] == 'pop_vlan':
101 1
            return ActionPopVLAN()
102
103 1
    @staticmethod
104
    def _create_vlan_tlv(vlan_id):
105 1
        tlv = OxmTLV()
106 1
        tlv.oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
107 1
        oxm_value = vlan_id | VlanId.OFPVID_PRESENT
108 1
        tlv.oxm_value = oxm_value.to_bytes(2, 'big')
109 1
        return tlv
110
111 1
    def to_dict(self, flow_stats):
112
        """Return a dictionary created from input 1.3 switch's flows."""
113 1
        flow_dict = {field: data.value
114
                     for field, data in vars(flow_stats).items()
115
                     if field in self.flow_attributes}
116 1
        flow_dict['match'] = self._match_to_dict(flow_stats)
117 1
        flow_dict['actions'] = self._actions_to_list(flow_stats)
118 1
        return flow_dict
119
120 1
    def _match_to_dict(self, flow_stats):
121 1
        match_dict = {}
122 1
        fields = (field for field in flow_stats.match.oxm_match_fields
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable field does not seem to be defined.
Loading history...
123
                  if field.oxm_field in self._match_values)
124 1
        for field in fields:
125 1
            match_field = self._match_values[field.oxm_field]
126 1
            if match_field == 'dl_vlan':
127 1
                data = int.from_bytes(field.oxm_value, 'big') & 4095
128 1
            elif match_field in ('dl_src', 'dl_dst'):
129 1
                addr = HWAddress()
130 1
                addr.unpack(field.oxm_value)
131 1
                data = str(addr)
132 1
            elif match_field in ('nw_src', 'nw_dst'):
133 1
                addr = IPAddress()
134 1
                addr.unpack(field.oxm_value)
135 1
                data = str(addr)
136
            else:
137 1
                data = int.from_bytes(field.oxm_value, 'big')
138 1
            match_dict[match_field] = data
139 1
        return match_dict
140
141 1
    def _actions_to_list(self, flow_stats):
142 1
        actions_list = []
143 1
        for action in self._filter_actions(flow_stats):
144 1
            action_dict = self._action_to_dict(action)
145 1
            actions_list.append(action_dict)
146 1
        return actions_list
147
148 1
    @staticmethod
149
    def _action_to_dict(action):
150 1
        if action.action_type == ActionType.OFPAT_SET_FIELD:
151 1
            if action.field.oxm_field == OxmOfbMatchField.OFPXMT_OFB_VLAN_VID:
152 1
                data = int.from_bytes(action.field.oxm_value, 'big') & 4095
153 1
                return {'action_type': 'set_vlan', 'vlan_id': data}
154 1
        elif action.action_type == ActionType.OFPAT_OUTPUT:
155 1
            if action.port == PortNo.OFPP_CONTROLLER:
156 1
                return {'action_type': 'output', 'port': 'controller'}
157 1
            return {'action_type': 'output', 'port': action.port.value}
158 1
        elif action.action_type == ActionType.OFPAT_PUSH_VLAN:
159 1
            if action.ethertype == EtherType.VLAN_QINQ:
160 1
                return {'action_type': 'push_vlan', 'tag_type': 's'}
161 1
            return {'action_type': 'push_vlan', 'tag_type': 'c'}
162 1
        elif action.action_type == ActionType.OFPAT_POP_VLAN:
163 1
            return {'action_type': 'pop_vlan'}
164 1
        return {}
165
166 1
    @staticmethod
167
    def _filter_actions(flow_stats):
168
        """Filter instructions and return a list of their actions."""
169 1
        instructions = (inst for inst in flow_stats.instructions
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable inst does not seem to be defined.
Loading history...
170
                        if inst.instruction_type == IType.OFPIT_APPLY_ACTIONS)
171 1
        action_lists = (instruction.actions for instruction in instructions)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable instruction does not seem to be defined.
Loading history...
172
        # Make a list from a list of lists
173
        return chain.from_iterable(action_lists)
174