Completed
Push — master ( e34524...3efb57 )
by
unknown
10:14
created

ElementVariable.unpack()   B

Complexity

Conditions 5

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 5
c 3
b 0
f 0
dl 0
loc 26
ccs 17
cts 17
cp 1
crap 5
rs 8.0894
1
"""
2
The variable NamedStruct element class.
3
4 1
Can be used in multiple ways ways:
5
6 1
1: Variable Lengths, in terms of namedstruct elements
7 1
8 1
    .. code-block:: python
9
10
        ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')])
11 1
        message_struct = [
12 1
            ('length_in_objects', 'H', 'vardata'),            # length field
13
            ('vardata', ExampleMessage, 'length_in_objects')  # variable length data
14
        ]
15
16
    The length is the string and you can think of it as "linking" to the
17
    length that is provided in the length field.
18
19
    .. note:: The length item is specified as a string, not as bytes
20
21
2: Variable lengths, in terms of byte size
22
23
    .. code-block:: python
24
25
        SomeMessage = namedstruct.Message(...)
26
        message_struct = [
27
            (b'length_in_bytes', 'B', 'vardata'),
28
            ('vardata', SomeMessage, b'length_in_bytes'),
29
        ]
30
31
    Now if our program specifies taht we should have a length in bytes field
32
    we can say 'length_in_bytes' = 8, while only have 2 SomeMessage, (assuming
33
    that the length of SomeMessge == 4).
34
35
    .. note:: The length item is specified as bytes, not as a string
36
37
3: Fixed length, in terms of namedstruct elements
38
39
    .. code-block:: python
40
41
        RepeatedMessage = Message('Repeated', [('x', 'B'), ('y', 'H')])
42
        message_struct = [
43
            ('repeated_data', RepeatedMessage, 3),
44
        ]
45
46
    Now we provide an integer that tells us that there will ALWAYS be that
47
    many messages in this message. You also no longer need to have another
48
    field that specifies the number of these messages.
49
50
4: Fixed length, in terms of bytes?
51
    TODO: write this
52
    Might have something that can only fit a certain number of bytes, like a
53
    CAN message, and this would break it up automatically?
54
"""
55
# pylint: disable=line-too-long
56
57
import struct
58
59
from typing import Optional
0 ignored issues
show
Configuration introduced by
The import typing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
60
61
import starstruct
62
from starstruct.element import register, Element
63
from starstruct.modes import Mode
64
65
66
@register
67
class ElementVariable(Element):
68
    """
69 1
    Initialize a StarStruct element object.
70
71
    :param field: The fields passed into the constructor of the element
72
    :param mode: The mode in which to pack the bytes
73
    :param alignment: Number of bytes to align to
74 1
    """
75 1 View Code Duplication
    def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
76
        # All of the type checks have already been performed by the class
77
        # factory
78
        self.name = field[0]
79
        self.ref = field[2]
80 1
81
        # Variable elements don't use the normal struct format, the format is
82
        # a StarStruct.Message object, but change the mode to match the
83 1
        # current mode.
84 1
        self.format = field[1]
85
86
        # Set the packing style for the struct
87 1
        if isinstance(self.ref, (str, bytes)):
88 1
            self.variable_repeat = True
89
90 1
            # Determine whether bytes or objects are the measurement tool
91
            if isinstance(self.ref, str):
92
                self.object_length = True
93 1
            else:
94
                self.object_length = False
95 1
96
                # Change our ref to be a string, for NamedTuple
97
                self.ref = self.ref.decode('utf-8')
98 1
        else:
99
            self.variable_repeat = False
100 1
101
            # TODO: If we add #4, then we would have to have a check here
102 1
            self.object_length = True
103
104
        self.update(mode, alignment)
105
106
    @staticmethod
107
    def valid(field: tuple) -> bool:
108
        """
109
        See :py:func:`starstruct.element.Element.valid`
110
111 1
        :param field: The items to determine the structure of the element
112
        """
113
        return len(field) == 3 \
114
            and isinstance(field[1], starstruct.message.Message) \
115 1
            and isinstance(field[2], (str, int, bytes))
116
117
    def validate(self, msg):
118
        """
119
        Ensure that the supplied message contains the required information for
120
        this element object to operate.
121
122 1
        All elements that are Variable must reference valid Length elements.
123 1
        """
124
        from starstruct.elementlength import ElementLength
125 1
        if self.variable_repeat:
126 1
            # Handle object length, not byte length
127
            if self.object_length:
128
                if not isinstance(msg[self.ref], ElementLength):
129 1
                    err = 'variable field {} reference {} invalid type'
130
                    raise TypeError(err.format(self.name, self.ref))
131
                elif not msg[self.ref].ref == self.name:
132
                    err = 'variable field {} reference {} mismatch'
133
                    raise TypeError(err.format(self.name, self.ref))
134
            # Handle byte length, not object length
135 1
            else:
136
                # TODO: Validate the object
137 1
                pass
138
        else:
139
            if not isinstance(self.ref, int):
140
                err = 'fixed repetition field {} reference {} not an integer'
141 1
                raise TypeError(err.format(self.name, self.ref))
142
143 1
    def update(self, mode=None, alignment=None):
144 1
        """change the mode of the struct format"""
145 1
        self._mode = mode
146
        self._alignment = alignment
147 1
        self.format.update(mode, alignment)
148
149
    def pack(self, msg):
150
        """Pack the provided values into the supplied buffer."""
151
        # When packing use the length of the current element to determine
152 1
        # how many elements to pack, not the length element of the message
153 1
        # (which should not be specified manually).
154 1
        if self.variable_repeat:
155
            if self.object_length:
156
                ret = [self.format.pack(dict(elem)) if elem else self.format.pack({})
157 1
                       for elem in msg[self.name]]
158 1
            else:
159
                ret = []
160 1
                length = 0
161 1
162
                for elem in msg[self.name]:
163 1
                    temp_elem = self.format.pack(dict(elem))
164 1
165
                    if length + len(temp_elem) <= msg[self.ref]:
166
                        ret.append(temp_elem)
167
168
        # Pack as many bytes as we have been given
169 1
        # and fill the rest of the byets with empty packing
170 1
        else:
171
            empty_byte = struct.pack('x')
172
            ret = [self.format.pack(msg[self.name][index]) if index < len(msg[self.name]) else empty_byte * len(self.format)
173
                   for index in range(self.ref)]
174
175
        # There is no need to make sure that the packed data is properly
176 1
        # aligned, because that should already be done by the individual
177
        # messages that have been packed.
178 1
        return b''.join(ret)
179
180
    def unpack(self, msg, buf):
181
        """Unpack data from the supplied buffer using the initialized format."""
182 1
        # When unpacking a variable element, reference the already unpacked
183 1
        # length field to determine how many elements need unpacked.
184 1
        ret = []
185 1
        unused = buf
186 1
        if self.object_length:
187 1
            if self.variable_repeat:
188
                msg_range = getattr(msg, self.ref)
189 1
            else:
190 1
                msg_range = self.ref
191 1
192 1
            for _ in range(msg_range):
193 1
                (val, unused) = self.format.unpack_partial(unused)
194
                ret.append(val)
195
        else:
196
            length = 0
197
            while length < getattr(msg, self.ref):
198 1
                (val, unused) = self.format.unpack_partial(unused)
199
                length += len(val)
200 1
                ret.append(val)
201
202 1
        # There is no need to make sure that the unpacked data consumes a
203 1
        # properly aligned number of bytes because that should already be done
204 1
        # by the individual messages that have been unpacked.
205 1
        return (ret, unused)
206
207
    def make(self, msg):
208
        """Return the expected "made" value"""
209
        ret = []
210
        for val in msg[self.name]:
211
            ret.append(self.format.make(val))
212
        return ret
213