Completed
Pull Request — master (#42)
by TJ
01:41
created

ElementVariable.validate()   B

Complexity

Conditions 6

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 8.8343

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 25
ccs 8
cts 14
cp 0.5714
crap 8.8343
rs 7.5384
1
"""
2
The variable NamedStruct element class.
3
4
Can be used in multiple ways ways:
5
6
1: Variable Lengths, in terms of namedstruct elements
7
8
    .. code-block:: python
9
10
        ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')])
11
        message_struct = [
12
            ('length_in_objects', 'H', 'vardata'),            # length field
13
            ('vardata', ExampleMessage, 'length_in_objects')  # variable length data
14
        ]
15
16
    The length is the string and you can think of it as "linking" to the
17
    length that is provided in the length field.
18
19
    .. note:: The length item is specified as a string, not as bytes
20
21
2: Variable lengths, in terms of byte size
22
23
    .. code-block:: python
24
25
        SomeMessage = namedstruct.Message(...)
26
        message_struct = [
27
            (b'length_in_bytes', 'B', 'vardata'),
28
            ('vardata', SomeMessage, b'length_in_bytes'),
29
        ]
30
31
    Now if our program specifies taht we should have a length in bytes field
32
    we can say 'length_in_bytes' = 8, while only have 2 SomeMessage, (assuming
33
    that the length of SomeMessge == 4).
34
35
    .. note:: The length item is specified as bytes, not as a string
36
37
3: Fixed length, in terms of namedstruct elements
38
39
    .. code-block:: python
40
41
        RepeatedMessage = Message('Repeated', [('x', 'B'), ('y', 'H')])
42
        message_struct = [
43
            ('repeated_data', RepeatedMessage, 3),
44
        ]
45
46
    Now we provide an integer that tells us that there will ALWAYS be that
47
    many messages in this message. You also no longer need to have another
48
    field that specifies the number of these messages.
49
50
4: Fixed length, in terms of bytes?
51
    TODO: write this
52
    Might have something that can only fit a certain number of bytes, like a
53
    CAN message, and this would break it up automatically?
54
"""
55
# pylint: disable=line-too-long
56
57 1
import struct
58
59 1
from typing import Optional
0 ignored issues
show
Configuration introduced by
The import typing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
60
61 1
import starstruct
62 1
from starstruct.element import register, Element
63 1
from starstruct.modes import Mode
64
65
66 1
@register
67 1
class ElementVariable(Element):
68
    """
69
    Initialize a StarStruct element object.
70
71
    :param field: The fields passed into the constructor of the element
72
    :param mode: The mode in which to pack the bytes
73
    :param alignment: Number of bytes to align to
74
    """
75
76
    # pylint: disable=too-many-instance-attributes
77
    # We need to keep track of several different styles of output here
78
79 1
    def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1):
80
        # All of the type checks have already been performed by the class
81
        # factory
82 1
        self.name = field[0]
83
84 1
        try:
85 1
            self.list_return = True
86 1
            self.ref = field[2]
87 1
        except IndexError:
88 1
            self.list_return = False
89 1
            self.ref = 1
90
91
        # Variable elements don't use the normal struct format, the format is
92
        # a StarStruct.Message object, but change the mode to match the
93
        # current mode.
94 1
        self.format = field[1]
95
96
        # Set the packing style for the struct
97 1
        if isinstance(self.ref, (str, bytes)):
98 1
            self.variable_repeat = True
99
100
            # Determine whether bytes or objects are the measurement tool
101 1
            if isinstance(self.ref, str):
102 1
                self.object_length = True
103 1
            elif isinstance(self.ref, bytes):
104 1
                self.object_length = False
105
106
                # Change our ref to be a string, for NamedTuple
107
                # pylint: disable=no-member
108 1
                self.ref = self.ref.decode('utf-8')
109
110
        else:
111 1
            self.variable_repeat = False
112
113
            # TODO: If we add #4, then we would have to have a check here
114 1
            self.object_length = True
115
116 1
        self._mode = mode
117 1
        self._alignment = alignment
118 1
        self.update(mode, alignment)
119
120 1
    @staticmethod
121 1
    def valid(field: tuple) -> bool:
122
        """
123
        See :py:func:`starstruct.element.Element.valid`
124
125
        :param field: The items to determine the structure of the element
126
        """
127 1
        if len(field) == 2:
128 1
            return isinstance(field[1], starstruct.message.Message)
129 1
        elif len(field) == 3:
130 1
            return isinstance(field[1], starstruct.message.Message) \
131
                and isinstance(field[2], (str, int, bytes))
132
        else:
133 1
            return False
134
135 1
    def validate(self, msg):
136
        """
137
        Ensure that the supplied message contains the required information for
138
        this element object to operate.
139
140
        All elements that are Variable must reference valid Length elements.
141
        """
142 1
        from starstruct.elementlength import ElementLength
143 1
        if self.variable_repeat:
144
            # Handle object length, not byte length
145 1
            if self.object_length:
146 1
                if not isinstance(msg[self.ref], ElementLength):
147
                    err = 'variable field {} reference {} invalid type'
148
                    raise TypeError(err.format(self.name, self.ref))
149 1
                elif not msg[self.ref].ref == self.name:
150
                    err = 'variable field {} reference {} mismatch'
151
                    raise TypeError(err.format(self.name, self.ref))
152
            # Handle byte length, not object length
153
            else:
154
                # TODO: Validate the object
155 1
                pass
156
        else:
157 1
            if not isinstance(self.ref, int):
158
                err = 'fixed repetition field {} reference {} not an integer'
159
                raise TypeError(err.format(self.name, self.ref))
160
161 1
    def update(self, mode=None, alignment=None):
162
        """change the mode of the struct format"""
163 1
        if self._mode is not None:
164 1
            self._mode = mode
165
166 1
        if self._alignment is not None:
167 1
            self._alignment = alignment
168
169 1
        self.format.update(self._mode, self._alignment)
170
171 1
    def pack(self, msg):
172
        """Pack the provided values into the supplied buffer."""
173
        # When packing use the length of the current element to determine
174
        # how many elements to pack, not the length element of the message
175
        # (which should not be specified manually).
176 1
        iterator = msg[self.name]
177
178 1
        if not isinstance(iterator, list):
179 1
            iterator = [iterator]
180
181 1
        iterator = [item if not hasattr(item, '_asdict') else item._asdict()
182
                    for item in iterator]
183
184 1
        if self.variable_repeat:
185 1
            if self.object_length:
186 1
                ret = [self.format.pack(dict(elem)) if elem else self.format.pack({})
187
                       for elem in iterator]
188
            else:
189 1
                ret = []
190 1
                length = 0
191
192 1
                for elem in iterator:
193 1
                    temp_elem = self.format.pack(dict(elem))
194
195 1
                    if length + len(temp_elem) <= msg[self.ref]:
196 1
                        ret.append(temp_elem)
197
198
        # Pack as many bytes as we have been given
199
        # and fill the rest of the byets with empty packing
200
        else:
201 1
            empty_byte = struct.pack('x')
202 1
            ret = [self.format.pack(iterator[index]) if index < len(iterator) else empty_byte * len(self.format)
203
                   for index in range(self.ref)]
204
205
        # There is no need to make sure that the packed data is properly
206
        # aligned, because that should already be done by the individual
207
        # messages that have been packed.
208 1
        return b''.join(ret)
209
210 1
    def unpack(self, msg, buf):
211
        """Unpack data from the supplied buffer using the initialized format."""
212
        # When unpacking a variable element, reference the already unpacked
213
        # length field to determine how many elements need unpacked.
214 1
        ret = []
215 1
        unused = buf
216
217 1
        if self.object_length:
218 1
            if self.variable_repeat:
219 1
                msg_range = getattr(msg, self.ref)
220
            else:
221 1
                msg_range = self.ref
222
223 1
            for _ in range(msg_range):
224 1
                (val, unused) = self.format.unpack_partial(unused)
225 1
                ret.append(val)
226
        else:
227 1
            length = 0
228 1
            while length < getattr(msg, self.ref):
229 1
                (val, unused) = self.format.unpack_partial(unused)
230 1
                length += len(val)
231 1
                ret.append(val)
232
233
        # There is no need to make sure that the unpacked data consumes a
234
        # properly aligned number of bytes because that should already be done
235
        # by the individual messages that have been unpacked.
236 1
        return (ret, unused)
237
238 1
    def make(self, msg):
239
        """Return the expected "made" value"""
240 1
        if self.list_return:
241 1
            ret = []
242 1
            for val in msg[self.name]:
243 1
                ret.append(self.format.make(val))
244
        else:
245 1
            if isinstance(msg[self.name], list):
246 1
                maker = msg[self.name][0]
247
            else:
248 1
                maker = msg[self.name]
249
250 1
            ret = self.format.make(maker)
251
252
        return ret
253