1 | """ |
||
2 | Element callable. |
||
3 | |||
4 | Call a function to validate data. |
||
5 | |||
6 | TODO: Update the format here |
||
7 | |||
8 | .. code-block:: python |
||
9 | |||
10 | from binascii import crc32 |
||
11 | |||
12 | ExampleMessage = Message('VarTest', [('x', 'B'), ('y', 'B')]) |
||
13 | |||
14 | CRCedMessage = Message('CRCedMessage', [ |
||
15 | ('data', ExampleMessage), # A data field that has the example message in it |
||
16 | ('crc', 'I', crc32, ['data']), # Crc the data, and give an error if we have something unexpected |
||
17 | ('crc', 'I', crc32, ['data'], False), # Crc the data, but don't give an error |
||
18 | ]) |
||
19 | |||
20 | Following creating this message, you have two options: |
||
21 | |||
22 | 1. Specify a value. The function will be used to validate the value. |
||
23 | |||
24 | .. code-block:: python |
||
25 | |||
26 | def adder(x, y): |
||
27 | return x + y |
||
28 | |||
29 | AdderMessage = Message('AdderMessage', [ |
||
30 | ('item_a', 'H'), |
||
31 | ('item_b', 'B'), |
||
32 | ('function_data', 'I', adder, ['item_a', 'item_b']), |
||
33 | ]) |
||
34 | |||
35 | test_data = { |
||
36 | 'item_a': 2, |
||
37 | 'item_b': 5, |
||
38 | 'function_data': 7, |
||
39 | } |
||
40 | |||
41 | made = AdderMessage.make(test_data) |
||
42 | assert made.item_a == 2 |
||
43 | assert made.item_b == 5 |
||
44 | assert made.function_data == 7 |
||
45 | |||
46 | # If you specify the wrong value, you'll get a ValueError |
||
47 | test_data = { |
||
48 | 'item_a': 2, |
||
49 | 'item_b': 5, |
||
50 | 'function_data': 33, |
||
51 | } |
||
52 | |||
53 | try: |
||
54 | made = AdderMessage.make(test_data) |
||
55 | except ValueError: |
||
56 | print('Told you so') |
||
57 | |||
58 | # Unless you specify `False` in your original item, then |
||
59 | # nobody will care. |
||
60 | |||
61 | 2. Use the function to generate a value. |
||
62 | |||
63 | .. code-block:: python |
||
64 | |||
65 | def adder(x, y): |
||
66 | return x + y |
||
67 | |||
68 | AdderMessage = Message('AdderMessage', [ |
||
69 | ('item_a', 'H'), |
||
70 | ('item_b', 'B'), |
||
71 | ('function_data', 'I', adder, ['item_a', 'item_b']), |
||
72 | ]) |
||
73 | |||
74 | test_data = { |
||
75 | 'item_a': 2, |
||
76 | 'item_b': 5, |
||
77 | } |
||
78 | |||
79 | made = AdderMessage.make(test_data) |
||
80 | assert made.item_a == 2 |
||
81 | assert made.item_b == 5 |
||
82 | assert made.function_data == 7 |
||
83 | |||
84 | """ |
||
85 | |||
86 | 1 | import copy |
|
87 | 1 | import struct |
|
88 | |||
89 | 1 | from typing import Optional |
|
90 | |||
91 | 1 | from starstruct.element import register, Element |
|
92 | 1 | from starstruct.modes import Mode |
|
93 | |||
94 | |||
95 | # pylint: disable=too-many-instance-attributes |
||
96 | 1 | @register |
|
97 | 1 | class ElementCallable(Element): |
|
98 | """ |
||
99 | Initialize a StarStruct element object. |
||
100 | |||
101 | :param field: The fields passed into the constructor of the element |
||
102 | :param mode: The mode in which to pack the bytes |
||
103 | :param alignment: Number of bytes to align to |
||
104 | """ |
||
105 | 1 | accepted_mesages = (True, False) |
|
106 | |||
107 | 1 | def __init__(self, field: list, mode: Optional[Mode]=Mode.Native, alignment: Optional[int]=1): |
|
108 | # All of the type checks have already been performed by the class |
||
109 | # factory |
||
110 | 1 | self.name = field[0] |
|
111 | |||
112 | # Callable elements use the normal struct packing method |
||
113 | 1 | self.format = field[1] |
|
114 | |||
115 | 1 | if isinstance(field[2], dict): |
|
116 | 1 | self.ref = field[2] |
|
117 | |||
118 | 1 | default_list = [None, None] |
|
119 | 1 | self._make_func = self.ref.get('make', default_list)[0] |
|
120 | 1 | self._make_args = self.ref.get('make', default_list)[1:] |
|
121 | |||
122 | 1 | self._pack_func = self.ref.get('pack', default_list)[0] |
|
123 | 1 | self._pack_args = self.ref.get('pack', default_list)[1:] |
|
124 | |||
125 | 1 | self._unpack_func = self.ref.get('unpack', default_list)[0] |
|
126 | 1 | self._unpack_args = self.ref.get('unpack', default_list)[1:] |
|
127 | 1 | elif isinstance(field[2], set): |
|
128 | 1 | instruction = field[2].copy().pop() |
|
129 | 1 | self.ref = {'all': instruction} |
|
130 | |||
131 | 1 | self._make_func = self._pack_func = self._unpack_func = instruction[0] |
|
132 | 1 | self._make_args = self._pack_args = self._unpack_args = instruction[1:] |
|
133 | |||
134 | 1 | if len(field) == 4: |
|
135 | 1 | self._error_on_bad_result = field[3] |
|
136 | else: |
||
137 | 1 | self._error_on_bad_result = True |
|
138 | |||
139 | 1 | self._elements = [] |
|
140 | |||
141 | 1 | self.update(mode, alignment) |
|
142 | |||
143 | 1 | @property |
|
144 | def _struct(self): |
||
145 | 1 | return struct.Struct(self._mode.value + self.format) |
|
146 | |||
147 | 1 | @staticmethod |
|
148 | 1 | def valid(field: tuple) -> bool: |
|
149 | """ |
||
150 | See :py:func:`starstruct.element.Element.valid` |
||
151 | |||
152 | :param field: The items to determine the structure of the element |
||
153 | """ |
||
154 | 1 | required_keys = {'pack', 'unpack', 'make'} |
|
155 | |||
156 | 1 | if len(field) >= 3 and isinstance(field[0], str) and isinstance(field[1], str): |
|
157 | 1 | if isinstance(field[2], dict): |
|
158 | 1 | if set(field[2].keys()) <= required_keys and \ |
|
159 | all(isinstance(val, tuple) for val in field[2].values()): |
||
160 | 1 | return True |
|
161 | 1 | elif isinstance(field[2], set): |
|
162 | 1 | return len(field[2]) == 1 and \ |
|
163 | all(isinstance(val, tuple) for val in field[2]) |
||
164 | |||
165 | 1 | return False |
|
166 | |||
167 | 1 | def validate(self, msg): |
|
168 | """ |
||
169 | Ensure that the supplied message contains the required information for |
||
170 | this element object to operate. |
||
171 | |||
172 | All elements that are Variable must reference valid Length elements. |
||
173 | """ |
||
174 | 1 | for action in self.ref.values(): |
|
175 | 1 | for arg in action[1:]: |
|
176 | 1 | if arg in ElementCallable.accepted_mesages: |
|
177 | continue |
||
178 | 1 | elif isinstance(arg, str): |
|
179 | 1 | pass |
|
180 | 1 | elif hasattr(arg, 'decode'): |
|
181 | 1 | arg = arg.decode('utf-8') |
|
182 | elif hasattr(arg, 'to_bytes'): |
||
183 | arg = arg.to_bytes((arg.bit_length() + 7) // 8, self._mode.to_byteorder()).decode('utf-8') |
||
184 | |||
185 | 1 | if arg not in msg: |
|
186 | raise ValueError('Need all keys to be in the message, {0} was not found\nAction: {1} -> {2}'.format(arg, action, action[1:])) |
||
187 | |||
188 | 1 | def update(self, mode=None, alignment=None): |
|
189 | """change the mode of the struct format""" |
||
190 | 1 | if mode: |
|
191 | 1 | self._mode = mode |
|
192 | |||
193 | 1 | if alignment: |
|
194 | 1 | self._alignment = alignment |
|
195 | |||
196 | 1 | def pack(self, msg): |
|
197 | """Pack the provided values into the supplied buffer.""" |
||
198 | 1 | pack_values = self.call_func(msg, self._pack_func, self._pack_args) |
|
199 | |||
200 | # Test if the object is iterable |
||
201 | # If it isn't, then turn it into a list |
||
202 | 1 | try: |
|
203 | 1 | _ = (p for p in pack_values) |
|
204 | 1 | except TypeError: |
|
205 | 1 | pack_values = [pack_values] |
|
206 | |||
207 | # Unpack the items for struct to allow for mutli-value |
||
208 | # items to be passed in. |
||
209 | 1 | return self._struct.pack(*pack_values) |
|
0 ignored issues
–
show
|
|||
210 | |||
211 | 1 | def unpack(self, msg, buf): |
|
212 | """Unpack data from the supplied buffer using the initialized format.""" |
||
213 | 1 | ret = self._struct.unpack_from(buf) |
|
214 | 1 | if isinstance(ret, (list, tuple)) and len(ret) == 1: |
|
215 | # We only change it not to a list if we expected one value. |
||
216 | # Otherwise, we keep it as a list, because that's what we would |
||
217 | # expect (like for a 16I type of struct |
||
218 | 1 | ret = ret[0] |
|
219 | |||
220 | # Only check for errors if they haven't told us not to |
||
221 | 1 | if self._error_on_bad_result: |
|
222 | # Pretend we're getting a dictionary to make our item, |
||
223 | # but it has no reference to `self`. This is so we check |
||
224 | # for errors correctly. |
||
225 | 1 | temp_dict = copy.deepcopy(msg._asdict()) |
|
226 | 1 | temp_dict.pop(self.name) |
|
227 | 1 | expected_value = self.call_func(msg, self._unpack_func, self._unpack_args) |
|
228 | |||
229 | # Check for an error |
||
230 | 1 | if expected_value != ret: |
|
231 | 1 | raise ValueError('Expected value was: {0}, but got: {1}'.format( |
|
232 | expected_value, |
||
233 | ret, |
||
234 | )) |
||
235 | |||
236 | 1 | if self.name in self._unpack_args: |
|
237 | 1 | msg = msg._replace(**{self.name: ret}) |
|
0 ignored issues
–
show
Usage of
* or ** arguments should usually be done with care.
Generally, there is nothing wrong with usage of For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect. ![]() |
|||
238 | |||
239 | 1 | ret = self.call_func(msg, self._unpack_func, self._unpack_args, original=ret) |
|
240 | |||
241 | 1 | return (ret, buf[self._struct.size:]) |
|
242 | |||
243 | 1 | def make(self, msg): |
|
244 | """Return the expected "made" value""" |
||
245 | # If we aren't going to error on a bad result |
||
246 | # and our name is in the message, just send the value |
||
247 | # No need to do extra work. |
||
248 | 1 | if not self._error_on_bad_result \ |
|
249 | and self.name in msg \ |
||
250 | and msg[self.name] is not None: |
||
251 | 1 | return msg[self.name] |
|
252 | |||
253 | 1 | ret = self.call_func(msg, self._make_func, self._make_args) |
|
254 | |||
255 | 1 | if self.name in msg: |
|
256 | 1 | if ret != msg[self.name]: |
|
257 | 1 | raise ValueError('Excepted value: {0}, but got: {1}'.format(ret, msg[self.name])) |
|
258 | |||
259 | 1 | return ret |
|
260 | |||
261 | 1 | def call_func(self, msg, func, args, original=None): |
|
262 | 1 | if func is None: |
|
263 | 1 | return original |
|
264 | |||
265 | 1 | items = self.prepare_args(msg, args) |
|
266 | 1 | return func(*items) |
|
267 | |||
268 | 1 | def prepare_args(self, msg, args): |
|
269 | 1 | items = [] |
|
270 | |||
271 | 1 | if hasattr(msg, '_asdict'): |
|
272 | 1 | msg = msg._asdict() |
|
273 | |||
274 | 1 | for reference in args: |
|
275 | 1 | if isinstance(reference, str): |
|
276 | 1 | index = reference |
|
277 | 1 | attr = 'make' |
|
278 | 1 | elif isinstance(reference, bytes): |
|
279 | 1 | index = reference.decode('utf-8') |
|
280 | 1 | attr = 'pack' |
|
281 | else: |
||
282 | raise ValueError('Needed str or bytes for the reference') |
||
283 | |||
284 | 1 | items.append( |
|
285 | getattr(self._elements[index], attr)(msg) |
||
286 | ) |
||
287 | |||
288 | return items |
||
289 |
Generally, there is nothing wrong with usage of
*
or**
arguments. For readability of the code base, we suggest to not over-use these language constructs though.For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.