Completed
Push — master ( 321b56...1e0336 )
by
unknown
06:04
created

Escapor.__init__()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
rs 9.4285
1
"""
2
The escaped NamedStruct element class.
3
4
Can be used in multiple ways ways:
5
6
1: Variable Lengths, in terms of namedstruct elements
7
8
    .. code-block:: python
9
10
        ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')])
11
        TestStruct = Message('TestStruct', [
12
            ('escaped_data', ExampleMessage, {
13
                'escape': {
14
                    'start': b'\xff\x00\xff\x11',
15
                    'separator': b'\x12\x34',
16
                    'end': b'\x11\xff\x00\xff',
17
                },
18
            }),
19
        ])
20
21
    `start` is the starting escape sequence
22
    `separator` is a separating sequence
23
    `end` is the ending escape sequence
24
25
"""
26
# pylint: disable=line-too-long
27
28
from typing import Optional
0 ignored issues
show
Configuration introduced by
The import typing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
29
30
import starstruct
31
from starstruct.element import register, Element
32
from starstruct.modes import Mode
33
34
35
class Escapor:
36
    def __init__(self, start=None, separator=None, end=None, opts=None):
37
        self._start = start
38
        self._separator = separator
39
        self._end = end
40
41
        self._opts = opts
42
43
    @property
44
    def start(self):
45
        if self._start is not None:
46
            return self._start
47
        else:
48
            return b''
49
50
    @property
51
    def separator(self):
52
        if self._separator is not None:
53
            return self._separator
54
        else:
55
            return b''
56
57
    @property
58
    def end(self):
59
        if self._end is not None:
60
            return self._end
61
        else:
62
            return b''
63
64
65
@register
66
class ElementEscaped(Element):
67
    """
68
    Initialize a StarStruct element object.
69
70
    :param field: The fields passed into the constructor of the element
71
    :param mode: The mode in which to pack the bytes
72
    :param alignment: Number of bytes to align to
73
    """
74
    def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1):
75
        # All of the type checks have already been performed by the class
76
        # factory
77
        self.name = field[0]
78
79
        # Escaped elements don't use the normal struct format, the format is
80
        # a StarStruct.Message object, but change the mode to match the
81
        # current mode.
82
        self.format = field[1]
83
84
        self.escapor = Escapor(**field[2]['escape'])
85
86
        self._mode = mode
87
        self._alignment = alignment
88
        self.update(mode, alignment)
89
90
    @staticmethod
91
    def valid(field: tuple) -> bool:
92
        """
93
        See :py:func:`starstruct.element.Element.valid`
94
95
        :param field: The items to determine the structure of the element
96
        """
97
        if len(field) == 3:
98
            return isinstance(field[1], starstruct.message.Message) \
99
                and isinstance(field[2], dict) \
100
                and 'escape' in field[2].keys()
101
        else:
102
            return False
103
104
    def validate(self, msg):
105
        """
106
        Ensure that the supplied message contains the required information for
107
        this element object to operate.
108
109
        All elements that are Variable must reference valid Length elements.
110
        """
111
        # TODO: Any validation needed here?
112
        pass
113
114
    def update(self, mode=None, alignment=None):
115
        """change the mode of the struct format"""
116
        if self._mode is not None:
117
            self._mode = mode
118
119
        if self._alignment is not None:
120
            self._alignment = alignment
121
122
        self.format.update(self._mode, self._alignment)
123
124
    def pack(self, msg):
125
        """Pack the provided values into the supplied buffer."""
126
        # When packing use the length of the current element to determine
127
        # how many elements to pack, not the length element of the message
128
        # (which should not be specified manually).
129
        iterator = msg[self.name]
130
131
        if not isinstance(iterator, list):
132
            iterator = [iterator]
133
134
        ret = self.escapor.start
135
136
        for item in iterator:
137
            ret += self.format.pack(item)
138
139
            ret += self.escapor.separator
140
141
        ret += self.escapor.end
142
143
        # There is no need to make sure that the packed data is properly
144
        # aligned, because that should already be done by the individual
145
        # messages that have been packed.
146
        return ret
147
148
    def unpack(self, msg, buf):
149
        """Unpack data from the supplied buffer using the initialized format."""
150
        # When unpacking a variable element, reference the already unpacked
151
        # length field to determine how many elements need unpacked.
152
        ret = []
153
154
        # Check the starting value
155
        if buf[:len(self.escapor.start)] == self.escapor.start:
156
            buf = buf[len(self.escapor.start):]
157
        else:
158
            raise ValueError('Buf did not start with expected start sequence: {0}'.format(
159
                self.escapor.start.decode()))
160
161
        unused = buf
162
163
        while True:
164
            (val, unused) = self.format.unpack_partial(unused)
165
            ret.append(val)
166
167
            if unused[:len(self.escapor.separator)] == self.escapor.separator:
168
                unused = unused[len(self.escapor.separator):]
169
            else:
170
                raise ValueError('Buf did not separate with expected separate sequence: {0}'.format(
171
                    self.escapor.separator.decode()))
172
173
            if unused[:len(self.escapor.end)] == self.escapor.end:
174
                unused = unused[len(self.escapor.end):]
175
                break
176
177
        # There is no need to make sure that the unpacked data consumes a
178
        # properly aligned number of bytes because that should already be done
179
        # by the individual messages that have been unpacked.
180
        return (ret, unused)
181
182
    def make(self, msg):
183
        """Return the expected "made" value"""
184
        ret = []
185
        for val in msg[self.name]:
186
            ret.append(self.format.make(val))
187
188
        return ret
189