Passed
Push — master ( c86d28...05d6fc )
by Humberto
01:30 queued 11s
created

FlowSerializer10.__init__()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 13
ccs 3
cts 3
cp 1
rs 9.8
c 0
b 0
f 0
cc 1
nop 1
crap 1
1
"""Flow serializer for OF 1.0."""
2 1
from pyof.v0x01.common.action import ActionOutput, ActionType, ActionVlanVid
3 1
from pyof.v0x01.common.phy_port import Port
4 1
from pyof.v0x01.controller2switch.flow_mod import FlowMod
5
6 1
from napps.kytos.flow_manager.serializers.base import FlowSerializer
7
8
9 1
class FlowSerializer10(FlowSerializer):
10
    """Flow serializer for OpenFlow 1.0."""
11
12 1
    def __init__(self):
13
        """Initialize OF 1.0 specific variables."""
14 1
        super().__init__()
15 1
        self.match_attributes = set((
16
            'in_port',
17
            'dl_src',
18
            'dl_dst',
19
            'dl_type',
20
            'dl_vlan',
21
            'dl_vlan_pcp',
22
            'nw_src',
23
            'nw_dst',
24
            'nw_proto'))
25
26 1
    def from_dict(self, dictionary):
27
        """Return an OF 1.0 FlowMod message from serialized dictionary."""
28 1
        flow_mod = FlowMod()
29 1
        for field, data in dictionary.items():
30 1
            if field in self.flow_attributes:
31 1
                setattr(flow_mod, field, data)
32 1
            elif field == 'match':
33 1
                self._update_match(flow_mod.match, data)
34 1
            elif field == 'actions':
35 1
                actions = self._actions_from_list(data)
36 1
                flow_mod.actions.extend(actions)
37 1
        return flow_mod
38
39 1
    def _update_match(self, match, dictionary):
40
        """Update match attributes found in dictionary."""
41 1
        for field, data in dictionary.items():
42 1
            if field in self.match_attributes:
43 1
                setattr(match, field, data)
44
45 1
    @staticmethod
46
    def _actions_from_list(action_list):
47
        """Return actions found in the action list."""
48 1
        actions = []
49 1
        for action in action_list:
50 1
            if action['action_type'] == 'set_vlan':
51 1
                new_action = ActionVlanVid(vlan_id=action['vlan_id'])
52 1
            elif action['action_type'] == 'output':
53 1
                if action['port'] == 'controller':
54 1
                    new_action = ActionOutput(port=Port.OFPP_CONTROLLER)
55
                else:
56 1
                    new_action = ActionOutput(port=action['port'])
57
            else:
58
                continue
59 1
            actions.append(new_action)
60 1
        return actions
61
62 1
    def to_dict(self, flow_stats):
63
        """Return a dictionary created from OF 1.0 FlowStats."""
64 1
        flow_dict = {field: data.value
65
                     for field, data in vars(flow_stats).items()
66
                     if field in self.flow_attributes}
67
68 1
        match_dict = {}
69 1
        for field, data in vars(flow_stats.match).items():
70 1
            if field in self.match_attributes:
71 1
                match_dict[field] = data.value
72
73 1
        actions_list = []
74 1
        for action in flow_stats.actions:
75 1
            if action.action_type == ActionType.OFPAT_SET_VLAN_VID:
76 1
                actions_list.append({'action_type': 'set_vlan',
77
                                     'vlan_id': action.vlan_id.value})
78 1
            elif action.action_type == ActionType.OFPAT_OUTPUT:
79 1
                if action.port == Port.OFPP_CONTROLLER:
80 1
                    actions_list.append({'action_type': 'output',
81
                                         'port': 'controller'})
82
                else:
83 1
                    actions_list.append({'action_type': 'output',
84
                                         'port': action.port.value})
85
86 1
        flow_dict.update({'match': match_dict, 'actions': actions_list})
87
        return flow_dict
88