Issues (43)

starstruct/elementescaped.py (1 issue)

1
"""
2
The escaped NamedStruct element class.
3
4
Can be used in multiple ways ways:
5
6
1: Variable Lengths, in terms of namedstruct elements
7
8
    .. code-block:: python
9
10
        ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')])
11
        TestStruct = Message('TestStruct', [
12
            ('escaped_data', ExampleMessage, {
13
                'escape': {
14
                    'start': b'\xff\x00\xff\x11',
15
                    'separator': b'\x12\x34',
16
                    'end': b'\x11\xff\x00\xff',
17
                },
18
            }),
19
        ])
20
21
    `start` is the starting escape sequence
22
    `separator` is a separating sequence
23
    `end` is the ending escape sequence
24
25
"""
26
# pylint: disable=line-too-long
27
28 1
from typing import Optional
0 ignored issues
show
The import typing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
29
30 1
import starstruct
31 1
from starstruct.element import register, Element
32 1
from starstruct.modes import Mode
33
34
35 1
class Escapor:
36 1
    def __init__(self, start=None, separator=None, end=None, opts=None):
37 1
        self._start = start
38 1
        self._separator = separator
39 1
        self._end = end
40
41 1
        self._opts = opts
42
43 1
    @property
44
    def start(self):
45 1
        if self._start is not None:
46 1
            return self._start
47
        else:
48
            return b''
49
50 1
    @property
51
    def separator(self):
52 1
        if self._separator is not None:
53 1
            return self._separator
54
        else:
55
            return b''
56
57 1
    @property
58
    def end(self):
59 1
        if self._end is not None:
60 1
            return self._end
61
        else:
62
            return b''
63
64
65 1
@register
66 1
class ElementEscaped(Element):
67
    """
68
    Initialize a StarStruct element object.
69
70
    :param field: The fields passed into the constructor of the element
71
    :param mode: The mode in which to pack the bytes
72
    :param alignment: Number of bytes to align to
73
    """
74 1
    def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1):
75
        # All of the type checks have already been performed by the class
76
        # factory
77 1
        self.name = field[0]
78
79
        # Escaped elements don't use the normal struct format, the format is
80
        # a StarStruct.Message object, but change the mode to match the
81
        # current mode.
82 1
        self.format = field[1]
83
84 1
        self.escapor = Escapor(**field[2]['escape'])
85
86 1
        self._mode = mode
87 1
        self._alignment = alignment
88 1
        self.update(mode, alignment)
89
90 1
    @staticmethod
91 1
    def valid(field: tuple) -> bool:
92
        """
93
        See :py:func:`starstruct.element.Element.valid`
94
95
        :param field: The items to determine the structure of the element
96
        """
97 1
        if len(field) == 3:
98 1
            return isinstance(field[1], starstruct.message.Message) \
99
                and isinstance(field[2], dict) \
100
                and 'escape' in field[2].keys()
101
        else:
102 1
            return False
103
104 1
    def validate(self, msg):
105
        """
106
        Ensure that the supplied message contains the required information for
107
        this element object to operate.
108
109
        All elements that are Variable must reference valid Length elements.
110
        """
111
        # TODO: Any validation needed here?
112 1
        pass
113
114 1
    def update(self, mode=None, alignment=None):
115
        """change the mode of the struct format"""
116 1
        if self._mode is not None:
117 1
            self._mode = mode
118
119 1
        if self._alignment is not None:
120 1
            self._alignment = alignment
121
122 1
        self.format.update(self._mode, self._alignment)
123
124 1
    def pack(self, msg):
125
        """Pack the provided values into the supplied buffer."""
126
        # When packing use the length of the current element to determine
127
        # how many elements to pack, not the length element of the message
128
        # (which should not be specified manually).
129 1
        iterator = msg[self.name]
130
131 1
        if not isinstance(iterator, list):
132
            iterator = [iterator]
133
134 1
        ret = self.escapor.start
135
136 1
        for item in iterator:
137 1
            ret += self.format.pack(item)
138
139 1
            ret += self.escapor.separator
140
141 1
        ret += self.escapor.end
142
143
        # There is no need to make sure that the packed data is properly
144
        # aligned, because that should already be done by the individual
145
        # messages that have been packed.
146 1
        return ret
147
148 1
    def unpack(self, msg, buf):
149
        """Unpack data from the supplied buffer using the initialized format."""
150
        # When unpacking a variable element, reference the already unpacked
151
        # length field to determine how many elements need unpacked.
152 1
        ret = []
153
154
        # Check the starting value
155 1
        if buf[:len(self.escapor.start)] == self.escapor.start:
156 1
            buf = buf[len(self.escapor.start):]
157
        else:
158
            raise ValueError('Buf did not start with expected start sequence: {0}'.format(
159
                self.escapor.start.decode()))
160
161 1
        unused = buf
162
163 1
        while True:
164 1
            (val, unused) = self.format.unpack_partial(unused)
165 1
            ret.append(val)
166
167 1
            if unused[:len(self.escapor.separator)] == self.escapor.separator:
168 1
                unused = unused[len(self.escapor.separator):]
169
            else:
170
                raise ValueError('Buf did not separate with expected separate sequence: {0}'.format(
171
                    self.escapor.separator.decode()))
172
173 1
            if unused[:len(self.escapor.end)] == self.escapor.end:
174 1
                unused = unused[len(self.escapor.end):]
175 1
                break
176
177
        # There is no need to make sure that the unpacked data consumes a
178
        # properly aligned number of bytes because that should already be done
179
        # by the individual messages that have been unpacked.
180 1
        return (ret, unused)
181
182 1
    def make(self, msg):
183
        """Return the expected "made" value"""
184 1
        ret = []
185 1
        for val in msg[self.name]:
186 1
            ret.append(self.format.make(val))
187
188
        return ret
189