Passed
Pull Request — master (#444)
by macartur
05:47
created

validate_packet()   C

Complexity

Conditions 7

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 7.9936

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 21
rs 6.4705
cc 7
ccs 8
cts 11
cp 0.7272
crap 7.9936
1
"""General Unpack utils for python-openflow.
2
3
This package was moved from kytos/of_core for the purpose of creating a generic
4
method to perform package unpack independent of the OpenFlow version.
5
"""
6 1
from pyof import v0x01, v0x04
7 1
from pyof.foundation.exceptions import UnpackException
8
9 1
PYOF_VERSION_LIBS = {0x01: v0x01,
10
                     0x04: v0x04}
11
12
13 1
def validate_packet(packet):
14
    """Check if packet is valid OF packet.
15
16
    Raises:
17
        UnpackException: If the packet is invalid.
18
    """
19 1
    if not isinstance(packet, bytes):
20
        raise UnpackException('invalid packet')
21
22 1
    packet_length = len(packet)
23
24 1
    if packet_length < 8 or packet_length > 2**16:
25
        raise UnpackException('invalid packet')
26
27 1
    if packet_length != int.from_bytes(packet[2:4],
28
                                       byteorder='big'):
29
        raise UnpackException('invalid packet')
30
31 1
    version = packet[0]
32 1
    if version == 0 or version >= 128:
33 1
        raise UnpackException('invalid packet')
34
35
36 1
def unpack(packet):
37
    """Unpack the OpenFlow Packet and returns a message.
38
39
    Returns:
40
        GenericMessage: Message unpacked based on openflow packet.
41
42
    Raises:
43
        UnpackException: if the packet can't be unpacked.
44
    """
45 1
    validate_packet(packet)
46
47 1
    version = packet[0]
48 1
    try:
49 1
        pyof_lib = PYOF_VERSION_LIBS[version]
50
    except KeyError:
51
        raise UnpackException('Version not supported')
52
53 1
    try:
54 1
        header = pyof_lib.common.header.Header()
55 1
        header.unpack(packet[:header.get_size()])
56 1
        message = pyof_lib.common.utils.new_message_from_header(header)
57
58 1
        binary_data = packet[header.get_size():]
59 1
        binary_data_size = header.length - header.get_size()
60
61 1
        if binary_data and len(binary_data) == binary_data_size:
62
            message.unpack(binary_data)
63 1
        return message
64
    except (UnpackException, ValueError) as exception:
65
        raise UnpackException(exception)
66