Passed
Pull Request — master (#435)
by Carlos Eduardo
02:34
created

new_message_from_header()   B

Complexity

Conditions 5

Size

Total Lines 31

Duplication

Lines 30
Ratio 96.77 %

Code Coverage

Tests 1
CRAP Score 25.0182

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 30
loc 31
rs 8.0894
cc 5
ccs 1
cts 14
cp 0.0714
crap 25.0182
1
"""Helper python-openflow functions."""
2
3
# System imports
4
5
# Third-party imports
6
7
# Local source tree imports
8
# Importing asynchronous messages
9 1
from pyof.v0x01.asynchronous.error_msg import ErrorMsg
10 1
from pyof.v0x01.asynchronous.flow_removed import FlowRemoved
11 1
from pyof.v0x01.asynchronous.packet_in import PacketIn
12 1
from pyof.v0x01.asynchronous.port_status import PortStatus
13
# Importing controller2switch messages
14 1
from pyof.v0x01.common.header import Header, Type
15 1
from pyof.v0x01.controller2switch.barrier_reply import BarrierReply
16 1
from pyof.v0x01.controller2switch.barrier_request import BarrierRequest
17 1
from pyof.v0x01.controller2switch.features_reply import FeaturesReply
18 1
from pyof.v0x01.controller2switch.features_request import FeaturesRequest
19 1
from pyof.v0x01.controller2switch.flow_mod import FlowMod
20 1
from pyof.v0x01.controller2switch.get_config_reply import GetConfigReply
21 1
from pyof.v0x01.controller2switch.get_config_request import GetConfigRequest
22 1
from pyof.v0x01.controller2switch.packet_out import PacketOut
23 1
from pyof.v0x01.controller2switch.port_mod import PortMod
24 1
from pyof.v0x01.controller2switch.queue_get_config_reply import (
25
    QueueGetConfigReply)
26 1
from pyof.v0x01.controller2switch.queue_get_config_request import (
27
    QueueGetConfigRequest)
28 1
from pyof.v0x01.controller2switch.set_config import SetConfig
29 1
from pyof.v0x01.controller2switch.stats_reply import StatsReply
30 1
from pyof.v0x01.controller2switch.stats_request import StatsRequest
31
# Importing symmetric messages
32 1
from pyof.v0x01.symmetric.echo_reply import EchoReply
33 1
from pyof.v0x01.symmetric.echo_request import EchoRequest
34 1
from pyof.v0x01.symmetric.hello import Hello
35 1
from pyof.v0x01.symmetric.vendor_header import VendorHeader
36
37 1
__all__ = ('new_message_from_header', 'new_message_from_message_type',
38
           'unpack_message')
39
40
41 1
def new_message_from_message_type(message_type):
42
    """Given an OpenFlow Message Type, return an empty message of that type.
43
44
    Args:
45
        messageType (:class:`~pyof.v0x01.common.header.Type`):
46
            Python-openflow message.
47
48
    Returns:
49
        Empty OpenFlow message of the requested message type.
50
51
    Raises:
52
        KytosUndefinedMessageType: Unkown Message_Type.
53
    """
54
    message_type = str(message_type)
55
56
    available_classes = {
57
        str(Type.OFPT_HELLO): Hello,
58
        str(Type.OFPT_ERROR): ErrorMsg,
59
        str(Type.OFPT_ECHO_REQUEST): EchoRequest,
60
        str(Type.OFPT_ECHO_REPLY): EchoReply,
61
        str(Type.OFPT_VENDOR): VendorHeader,
62
        str(Type.OFPT_FEATURES_REQUEST): FeaturesRequest,
63
        str(Type.OFPT_FEATURES_REPLY): FeaturesReply,
64
        str(Type.OFPT_GET_CONFIG_REQUEST): GetConfigRequest,
65
        str(Type.OFPT_GET_CONFIG_REPLY): GetConfigReply,
66
        str(Type.OFPT_SET_CONFIG): SetConfig,
67
        str(Type.OFPT_PACKET_IN): PacketIn,
68
        str(Type.OFPT_FLOW_REMOVED): FlowRemoved,
69
        str(Type.OFPT_PORT_STATUS): PortStatus,
70
        str(Type.OFPT_PACKET_OUT): PacketOut,
71
        str(Type.OFPT_FLOW_MOD): FlowMod,
72
        str(Type.OFPT_PORT_MOD): PortMod,
73
        str(Type.OFPT_STATS_REQUEST): StatsRequest,
74
        str(Type.OFPT_STATS_REPLY): StatsReply,
75
        str(Type.OFPT_BARRIER_REQUEST): BarrierRequest,
76
        str(Type.OFPT_BARRIER_REPLY): BarrierReply,
77
        str(Type.OFPT_QUEUE_GET_CONFIG_REQUEST): QueueGetConfigRequest,
78
        str(Type.OFPT_QUEUE_GET_CONFIG_REPLY): QueueGetConfigReply
79
    }
80
81
    if message_type not in available_classes:
82
        raise ValueError('"{}" is not known.'.format(message_type))
83
84
    message_class = available_classes.get(message_type)
85
    message_instance = message_class()
86
87
    return message_instance
88
89 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90 1
def new_message_from_header(header):
91
    """Given an OF Header, return an empty message of header's message_type.
92
93
    Args:
94
        header (~pyof.v0x01.common.header.Header): Unpacked OpenFlow Header.
95
96
    Returns:
97
        Empty OpenFlow message of the same type of message_type attribute from
98
        the given header.
99
        The header attribute of the message will be populated.
100
101
    Raises:
102
        KytosUndefinedMessageType: Unkown Message_Type.
103
    """
104
    message_type = header.message_type
105
    if not isinstance(message_type, Type):
106
        try:
107
            if isinstance(message_type, str):
108
                # False positive https://github.com/PyCQA/pylint/issues/992
109
                # pylint: disable=E1136
110
                message_type = Type[message_type]
111
            elif isinstance(message_type, int):
112
                message_type = Type(message_type)
113
        except ValueError:
114
            raise ValueError
115
116
    message = new_message_from_message_type(message_type)
117
    message.header.xid = header.xid
118
    message.header.length = header.length
119
120
    return message
121
122
123 1
def unpack_message(buffer):
124
    """Unpack the whole buffer, including header pack.
125
126
    Args:
127
        buffer (bytes): Bytes representation of a openflow message.
128
    Returns:
129
        object: Instance of openflow message.
130
    """
131
    hdr_size = Header().get_size()
132
    hdr_buff, msg_buff = buffer[:hdr_size], buffer[hdr_size:]
133
    header = Header()
134
    header.unpack(hdr_buff)
135
    message = new_message_from_header(header)
136
    message.unpack(msg_buff)
137
    return message
138