1
|
|
|
"""StarStruct class.""" |
2
|
|
|
|
3
|
1 |
|
import collections |
4
|
|
|
|
5
|
1 |
|
import struct |
6
|
1 |
|
import starstruct.modes |
7
|
1 |
|
from starstruct.element import Element |
8
|
1 |
|
from starstruct.startuple import StarTuple |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
# pylint: disable=line-too-long |
12
|
1 |
|
class Message(object): |
13
|
|
|
"""An object much like NamedTuple, but with additional formatting.""" |
14
|
|
|
|
15
|
|
|
# pylint: disable=too-many-branches |
16
|
1 |
|
def __init__(self, name, fields, mode=starstruct.modes.Mode.Native, alignment=1): |
17
|
|
|
""" |
18
|
|
|
Initialize a StarStruct object. |
19
|
|
|
|
20
|
|
|
Creates 2 internal items, a format string which is used to call the |
21
|
|
|
struct module functions for packing and unpacking data, and a |
22
|
|
|
namedtuple instance which is used to organize the data provided to the |
23
|
|
|
pack functions and returned from the unpack functions. |
24
|
|
|
""" |
25
|
|
|
|
26
|
|
|
# The name must be a string, this is provided to the |
27
|
|
|
# collections.namedtuple constructor when creating the namedtuple class. |
28
|
1 |
|
if not name or not isinstance(name, str): |
29
|
1 |
|
raise TypeError('invalid name: {}'.format(name)) |
30
|
|
|
|
31
|
1 |
|
self._name = name |
32
|
1 |
|
self.mode = mode |
33
|
1 |
|
self.alignment = alignment |
34
|
|
|
|
35
|
|
|
# The structure definition must be a list of |
36
|
|
|
# ('name', 'format', <optional>) |
37
|
|
|
# tuples |
38
|
1 |
|
if not isinstance(fields, list) \ |
39
|
|
|
or not all(isinstance(x, tuple) for x in fields): |
40
|
|
|
raise TypeError('invalid fields: {}'.format(fields)) |
41
|
|
|
|
42
|
1 |
|
if not isinstance(mode, starstruct.modes.Mode): |
43
|
1 |
|
raise TypeError('invalid mode: {}'.format(mode)) |
44
|
|
|
|
45
|
|
|
# Create an ordered dictionary (so element order is preserved) out of |
46
|
|
|
# the individual message fields. Ensure that there are no duplicate |
47
|
|
|
# field names. |
48
|
1 |
|
self._elements = collections.OrderedDict() |
49
|
1 |
|
for field in fields: |
50
|
1 |
|
if field[0] not in self._elements: |
51
|
1 |
|
if isinstance(field[0], str): |
52
|
1 |
|
self._elements[field[0]] = Element.factory(field, mode, alignment) |
53
|
1 |
|
elif isinstance(field[0], bytes): |
54
|
1 |
|
self._elements[field[0].decode('utf-8')] = Element.factory(field, mode, alignment) |
55
|
|
|
else: |
56
|
|
|
raise NotImplementedError |
57
|
|
|
else: |
58
|
|
|
raise TypeError('duplicate field {} in {}'.format(field[0], fields)) |
59
|
|
|
|
60
|
|
|
# Validate all of the elements of this message |
61
|
1 |
|
for elem in self._elements.values(): |
62
|
1 |
|
elem.validate(self._elements) |
63
|
|
|
|
64
|
|
|
# Give each element information about the other elements |
65
|
1 |
|
elem._elements = self._elements |
66
|
|
|
|
67
|
|
|
# Now that the format has been validated, create a named tuple with the |
68
|
|
|
# correct fields. |
69
|
1 |
|
named_fields = [elem.name for elem in self._elements.values() if elem.name] |
70
|
1 |
|
self._tuple = StarTuple(self._name, named_fields, self._elements) |
71
|
|
|
|
72
|
1 |
|
def update(self, mode=None, alignment=None): |
73
|
|
|
""" Change the mode of a message. """ |
74
|
1 |
|
if mode and not isinstance(mode, starstruct.modes.Mode): |
75
|
|
|
raise TypeError('invalid mode: {}'.format(mode)) |
76
|
|
|
|
77
|
|
|
# Change the mode for all elements |
78
|
1 |
|
for key in self._elements.keys(): |
79
|
1 |
|
self._elements[key].update(mode, alignment) |
80
|
|
|
|
81
|
1 |
|
def is_unpacked(self, other): |
82
|
|
|
""" |
83
|
|
|
Provide a function that allows checking if an unpacked message tuple |
84
|
|
|
is an instance of what could be unpacked from a particular message |
85
|
|
|
object. |
86
|
|
|
""" |
87
|
|
|
# First check to see if the passed in object is a namedtuple |
88
|
|
|
# that matches this message type |
89
|
|
|
if not isinstance(other, self._tuple): |
90
|
|
|
return False |
91
|
|
|
|
92
|
|
|
# Then check any element values that may be another message type to |
93
|
|
|
# ensure that the sub-elements are valid types. |
94
|
|
|
for key in self._elements.keys(): |
95
|
|
|
if hasattr(self._elements[key].format, 'is_unpacked'): |
96
|
|
|
# If the format for an element is Message object (that has the |
97
|
|
|
# is_unpacked() function defined), call the is_unpacked() |
98
|
|
|
# function. |
99
|
|
|
msg = self._elements[key].format |
100
|
|
|
if not msg.is_unpacked(getattr(other, key)): |
101
|
|
|
return False |
102
|
|
|
if hasattr(self._elements[key].format, 'keys'): |
103
|
|
|
# If the format for an element is a dictionary, attempt to |
104
|
|
|
# extract the correct item with the assumption that the ref |
105
|
|
|
# attribute identifies the discriminator |
106
|
|
|
|
107
|
|
|
# Select the correct message object based on the value of the |
108
|
|
|
# referenced item |
109
|
|
|
ref_val = getattr(other, self._elements[key].ref) |
110
|
|
|
if ref_val not in self._elements[key].format.keys(): |
111
|
|
|
return False |
112
|
|
|
msg = self._elements[key].format[ref_val] |
113
|
|
|
if not msg.is_unpacked(getattr(other, key)): |
114
|
|
|
return False |
115
|
|
|
return True |
116
|
|
|
|
117
|
1 |
|
def pack(self, obj=None, **kwargs): |
118
|
|
|
"""Pack the provided values using the initialized format.""" |
119
|
|
|
# Handle a positional dictionary argument as well as the more generic kwargs |
120
|
1 |
|
if obj and isinstance(obj, dict): |
121
|
1 |
|
kwargs = obj |
122
|
1 |
|
return b''.join(elem.pack(kwargs) for elem in self._elements.values()) |
123
|
|
|
|
124
|
1 |
|
def unpack_partial(self, buf): |
125
|
|
|
""" |
126
|
|
|
Unpack a partial message from a buffer. |
127
|
|
|
|
128
|
|
|
This doesn't re-use the "unpack_from" function name from the struct |
129
|
|
|
module because the parameters and return values are not consistent |
130
|
|
|
between this function and the struct module. |
131
|
|
|
""" |
132
|
1 |
|
msg = self._tuple._make([None] * len(self._tuple._fields)) |
133
|
1 |
|
for elem in self._elements.values(): |
134
|
1 |
|
(val, unused) = elem.unpack(msg, buf) |
135
|
1 |
|
buf = unused |
136
|
|
|
# Update the unpacked message with all non-padding elements |
137
|
1 |
|
if elem.name: |
138
|
1 |
|
msg = msg._replace(**dict([(elem.name, val)])) |
139
|
1 |
|
return (msg, buf) |
140
|
|
|
|
141
|
1 |
|
def unpack(self, buf): |
142
|
|
|
"""Unpack the buffer using the initialized format.""" |
143
|
1 |
|
(msg, unused) = self.unpack_partial(buf) |
144
|
1 |
|
if unused: |
145
|
1 |
|
error = 'buffer not fully used by unpack: {}'.format(unused) |
146
|
1 |
|
raise ValueError(error) |
147
|
1 |
|
return msg |
148
|
|
|
|
149
|
1 |
|
def make(self, obj=None, **kwargs): |
150
|
|
|
""" |
151
|
|
|
A utility function that returns a namedtuple based on the current |
152
|
|
|
object's format for the supplied object. |
153
|
|
|
""" |
154
|
1 |
|
if obj is not None: |
155
|
1 |
|
if isinstance(obj, dict): |
156
|
1 |
|
kwargs = obj |
157
|
|
|
elif isinstance(obj, tuple): |
158
|
|
|
kwargs = obj._asdict() |
159
|
1 |
|
msg = self._tuple._make([None] * len(self._tuple._fields)) |
160
|
|
|
# Only attempt to "make" fields that are in the tuple |
161
|
1 |
|
for field in self._tuple._fields: |
162
|
1 |
|
val = self._elements[field].make(kwargs) |
163
|
1 |
|
msg = msg._replace(**dict([(field, val)])) |
|
|
|
|
164
|
|
|
|
165
|
|
|
# msg.__packed = self.pack(**kwargs) |
166
|
1 |
|
return msg |
167
|
|
|
|
168
|
1 |
|
def __len__(self): |
169
|
1 |
|
if self._elements == {}: |
170
|
1 |
|
return 0 |
171
|
|
|
|
172
|
1 |
|
size = 0 |
173
|
1 |
|
for val in self._elements.values(): |
174
|
1 |
|
if isinstance(val.format, (bytes, str)): |
175
|
1 |
|
size += struct.calcsize(val.format) |
176
|
1 |
|
elif isinstance(val.format, (dict, )): |
177
|
1 |
|
lengths = {len(item) for item in val.format.values()} |
178
|
1 |
|
if len(lengths) > 1: |
179
|
1 |
|
raise AttributeError('Unable to calculate size due to differing size sub items') |
180
|
|
|
|
181
|
1 |
|
size += sum(lengths) |
182
|
|
|
else: |
183
|
|
|
size += len(val.format) |
184
|
|
|
|
185
|
|
|
return size |
186
|
|
|
|
Generally, there is nothing wrong with usage of
*
or**
arguments. For readability of the code base, we suggest to not over-use these language constructs though.For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.