Completed
Push — master ( 1e0336...03e2d5 )
by
unknown
08:19
created

ElementVariable.make()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 4
c 2
b 0
f 0
dl 0
loc 15
rs 9.2
ccs 0
cts 0
cp 0
crap 20
1
"""
2
The variable NamedStruct element class.
3
4 1
Can be used in multiple ways ways:
5
6 1
1: Variable Lengths, in terms of namedstruct elements
7 1
8 1
    .. code-block:: python
9
10
        ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')])
11 1
        message_struct = [
12 1
            ('length_in_objects', 'H', 'vardata'),            # length field
13
            ('vardata', ExampleMessage, 'length_in_objects')  # variable length data
14
        ]
15
16
    The length is the string and you can think of it as "linking" to the
17
    length that is provided in the length field.
18
19
    .. note:: The length item is specified as a string, not as bytes
20
21
2: Variable lengths, in terms of byte size
22
23
    .. code-block:: python
24
25
        SomeMessage = namedstruct.Message(...)
26
        message_struct = [
27
            (b'length_in_bytes', 'B', 'vardata'),
28
            ('vardata', SomeMessage, b'length_in_bytes'),
29
        ]
30
31
    Now if our program specifies taht we should have a length in bytes field
32
    we can say 'length_in_bytes' = 8, while only have 2 SomeMessage, (assuming
33
    that the length of SomeMessge == 4).
34
35
    .. note:: The length item is specified as bytes, not as a string
36
37
3: Fixed length, in terms of namedstruct elements
38
39
    .. code-block:: python
40
41
        RepeatedMessage = Message('Repeated', [('x', 'B'), ('y', 'H')])
42
        message_struct = [
43
            ('repeated_data', RepeatedMessage, 3),
44
        ]
45
46
    Now we provide an integer that tells us that there will ALWAYS be that
47
    many messages in this message. You also no longer need to have another
48
    field that specifies the number of these messages.
49
50
4: Fixed length, in terms of bytes?
51
    TODO: write this
52
    Might have something that can only fit a certain number of bytes, like a
53
    CAN message, and this would break it up automatically?
54
"""
55
# pylint: disable=line-too-long
56
57
import struct
58
59
from typing import Optional
0 ignored issues
show
Configuration introduced by
The import typing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
60
61
import starstruct
62
from starstruct.element import register, Element
63
from starstruct.modes import Mode
64
65
66
@register
67
class ElementVariable(Element):
68
    """
69 1
    Initialize a StarStruct element object.
70
71
    :param field: The fields passed into the constructor of the element
72
    :param mode: The mode in which to pack the bytes
73
    :param alignment: Number of bytes to align to
74 1
    """
75 1
76
    # pylint: disable=too-many-instance-attributes
77
    # We need to keep track of several different styles of output here
78
79
    def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1):
80 1
        # All of the type checks have already been performed by the class
81
        # factory
82
        self.name = field[0]
83 1
84 1
        try:
85
            self.list_return = True
86
            self.ref = field[2]
87 1
        except IndexError:
88 1
            self.list_return = False
89
            self.ref = 1
90 1
91
        # Variable elements don't use the normal struct format, the format is
92
        # a StarStruct.Message object, but change the mode to match the
93 1
        # current mode.
94
        self.format = field[1]
95 1
96
        # Set the packing style for the struct
97
        if isinstance(self.ref, (str, bytes)):
98 1
            self.variable_repeat = True
99
100 1
            # Determine whether bytes or objects are the measurement tool
101
            if isinstance(self.ref, str):
102 1
                self.object_length = True
103
            elif isinstance(self.ref, bytes):
104
                self.object_length = False
105
106
                # Change our ref to be a string, for NamedTuple
107
                self.ref = self.ref.decode('utf-8')
0 ignored issues
show
Bug introduced by
The Instance of int does not seem to have a member named decode.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
108
109
        else:
110
            self.variable_repeat = False
111 1
112
            # TODO: If we add #4, then we would have to have a check here
113
            self.object_length = True
114
115 1
        self._mode = mode
116
        self._alignment = alignment
117
        self.update(mode, alignment)
118
119
    @staticmethod
120
    def valid(field: tuple) -> bool:
121
        """
122 1
        See :py:func:`starstruct.element.Element.valid`
123 1
124
        :param field: The items to determine the structure of the element
125 1
        """
126 1
        if len(field) == 2:
127
            return isinstance(field[1], starstruct.message.Message)
128
        elif len(field) == 3:
129 1
            return isinstance(field[1], starstruct.message.Message) \
130
                and isinstance(field[2], (str, int, bytes))
131
        else:
132
            return False
133
134
    def validate(self, msg):
135 1
        """
136
        Ensure that the supplied message contains the required information for
137 1
        this element object to operate.
138
139
        All elements that are Variable must reference valid Length elements.
140
        """
141 1
        from starstruct.elementlength import ElementLength
142
        if self.variable_repeat:
143 1
            # Handle object length, not byte length
144 1
            if self.object_length:
145 1
                if not isinstance(msg[self.ref], ElementLength):
146
                    err = 'variable field {} reference {} invalid type'
147 1
                    raise TypeError(err.format(self.name, self.ref))
148
                elif not msg[self.ref].ref == self.name:
149
                    err = 'variable field {} reference {} mismatch'
150
                    raise TypeError(err.format(self.name, self.ref))
151
            # Handle byte length, not object length
152 1
            else:
153 1
                # TODO: Validate the object
154 1
                pass
155
        else:
156
            if not isinstance(self.ref, int):
157 1
                err = 'fixed repetition field {} reference {} not an integer'
158 1
                raise TypeError(err.format(self.name, self.ref))
159
160 1
    def update(self, mode=None, alignment=None):
161 1
        """change the mode of the struct format"""
162
        if self._mode is not None:
163 1
            self._mode = mode
164 1
165
        if self._alignment is not None:
166
            self._alignment = alignment
167
168
        self.format.update(self._mode, self._alignment)
169 1
170 1
    def pack(self, msg):
171
        """Pack the provided values into the supplied buffer."""
172
        # When packing use the length of the current element to determine
173
        # how many elements to pack, not the length element of the message
174
        # (which should not be specified manually).
175
        iterator = msg[self.name]
176 1
177
        if not isinstance(iterator, list):
178 1
            iterator = [iterator]
179
180
        if self.variable_repeat:
181
            if self.object_length:
182 1
                ret = [self.format.pack(dict(elem)) if elem else self.format.pack({})
183 1
                       for elem in iterator]
184 1
            else:
185 1
                ret = []
186 1
                length = 0
187 1
188
                for elem in iterator:
189 1
                    temp_elem = self.format.pack(dict(elem))
190 1
191 1
                    if length + len(temp_elem) <= msg[self.ref]:
192 1
                        ret.append(temp_elem)
193 1
194
        # Pack as many bytes as we have been given
195
        # and fill the rest of the byets with empty packing
196
        else:
197
            empty_byte = struct.pack('x')
198 1
            ret = [self.format.pack(iterator[index]) if index < len(iterator) else empty_byte * len(self.format)
199
                   for index in range(self.ref)]
200 1
201
        # There is no need to make sure that the packed data is properly
202 1
        # aligned, because that should already be done by the individual
203 1
        # messages that have been packed.
204 1
        return b''.join(ret)
205 1
206
    def unpack(self, msg, buf):
207
        """Unpack data from the supplied buffer using the initialized format."""
208
        # When unpacking a variable element, reference the already unpacked
209
        # length field to determine how many elements need unpacked.
210
        ret = []
211
        unused = buf
212
213
        if self.object_length:
214
            if self.variable_repeat:
215
                msg_range = getattr(msg, self.ref)
216
            else:
217
                msg_range = self.ref
218
219
            for _ in range(msg_range):
220
                (val, unused) = self.format.unpack_partial(unused)
221
                ret.append(val)
222
        else:
223
            length = 0
224
            while length < getattr(msg, self.ref):
225
                (val, unused) = self.format.unpack_partial(unused)
226
                length += len(val)
227
                ret.append(val)
228
229
        # There is no need to make sure that the unpacked data consumes a
230
        # properly aligned number of bytes because that should already be done
231
        # by the individual messages that have been unpacked.
232
        return (ret, unused)
233
234
    def make(self, msg):
235
        """Return the expected "made" value"""
236
        if self.list_return:
237
            ret = []
238
            for val in msg[self.name]:
239
                ret.append(self.format.make(val))
240
        else:
241
            if isinstance(msg[self.name], list):
242
                maker = msg[self.name][0]
243
            else:
244
                maker = msg[self.name]
245
246
            ret = self.format.make(maker)
247
248
        return ret
249