Issues (43)

starstruct/message.py (1 issue)

1
"""StarStruct class."""
2
3 1
import collections
4
5 1
import struct
6 1
import starstruct.modes
7 1
from starstruct.element import Element
8 1
from starstruct.startuple import StarTuple
9
10
11
# pylint: disable=line-too-long
12 1
class Message(object):
13
    """An object much like NamedTuple, but with additional formatting."""
14
15
    # pylint: disable=too-many-branches
16 1
    def __init__(self, name, fields, mode=starstruct.modes.Mode.Native, alignment=1):
17
        """
18
        Initialize a StarStruct object.
19
20
        Creates 2 internal items, a format string which is used to call the
21
        struct module functions for packing and unpacking data, and a
22
        namedtuple instance which is used to organize the data provided to the
23
        pack functions and returned from the unpack functions.
24
        """
25
26
        # The name must be a string, this is provided to the
27
        # collections.namedtuple constructor when creating the namedtuple class.
28 1
        if not name or not isinstance(name, str):
29 1
            raise TypeError('invalid name: {}'.format(name))
30
31 1
        self._name = name
32 1
        self.mode = mode
33 1
        self.alignment = alignment
34
35
        # The structure definition must be a list of
36
        #   ('name', 'format', <optional>)
37
        # tuples
38 1
        if not isinstance(fields, list) \
39
                or not all(isinstance(x, tuple) for x in fields):
40
            raise TypeError('invalid fields: {}'.format(fields))
41
42 1
        if not isinstance(mode, starstruct.modes.Mode):
43 1
            raise TypeError('invalid mode: {}'.format(mode))
44
45
        # Create an ordered dictionary (so element order is preserved) out of
46
        # the individual message fields.  Ensure that there are no duplicate
47
        # field names.
48 1
        self._elements = collections.OrderedDict()
49 1
        for field in fields:
50 1
            if field[0] not in self._elements:
51 1
                if isinstance(field[0], str):
52 1
                    self._elements[field[0]] = Element.factory(field, mode, alignment)
53 1
                elif isinstance(field[0], bytes):
54 1
                    self._elements[field[0].decode('utf-8')] = Element.factory(field, mode, alignment)
55
                else:
56
                    raise NotImplementedError
57
            else:
58
                raise TypeError('duplicate field {} in {}'.format(field[0], fields))
59
60
        # Validate all of the elements of this message
61 1
        for elem in self._elements.values():
62 1
            elem.validate(self._elements)
63
64
            # Give each element information about the other elements
65 1
            elem._elements = self._elements
66
67
        # Now that the format has been validated, create a named tuple with the
68
        # correct fields.
69 1
        named_fields = [elem.name for elem in self._elements.values() if elem.name]
70 1
        self._tuple = StarTuple(self._name, named_fields, self._elements)
71
72 1
    def update(self, mode=None, alignment=None):
73
        """ Change the mode of a message. """
74 1
        if mode and not isinstance(mode, starstruct.modes.Mode):
75
            raise TypeError('invalid mode: {}'.format(mode))
76
77
        # Change the mode for all elements
78 1
        for key in self._elements.keys():
79 1
            self._elements[key].update(mode, alignment)
80
81 1
    def is_unpacked(self, other):
82
        """
83
        Provide a function that allows checking if an unpacked message tuple
84
        is an instance of what could be unpacked from a particular message
85
        object.
86
        """
87
        # First check to see if the passed in object is a namedtuple
88
        # that matches this message type
89
        if not isinstance(other, self._tuple):
90
            return False
91
92
        # Then check any element values that may be another message type to
93
        # ensure that the sub-elements are valid types.
94
        for key in self._elements.keys():
95
            if hasattr(self._elements[key].format, 'is_unpacked'):
96
                # If the format for an element is Message object (that has the
97
                # is_unpacked() function defined), call the is_unpacked()
98
                # function.
99
                msg = self._elements[key].format
100
                if not msg.is_unpacked(getattr(other, key)):
101
                    return False
102
            if hasattr(self._elements[key].format, 'keys'):
103
                # If the format for an element is a dictionary, attempt to
104
                # extract the correct item with the assumption that the ref
105
                # attribute identifies the discriminator
106
107
                # Select the correct message object based on the value of the
108
                # referenced item
109
                ref_val = getattr(other, self._elements[key].ref)
110
                if ref_val not in self._elements[key].format.keys():
111
                    return False
112
                msg = self._elements[key].format[ref_val]
113
                if not msg.is_unpacked(getattr(other, key)):
114
                    return False
115
        return True
116
117 1
    def pack(self, obj=None, **kwargs):
118
        """Pack the provided values using the initialized format."""
119
        # Handle a positional dictionary argument as well as the more generic kwargs
120 1
        if obj and isinstance(obj, dict):
121 1
            kwargs = obj
122 1
        return b''.join(elem.pack(kwargs) for elem in self._elements.values())
123
124 1
    def unpack_partial(self, buf):
125
        """
126
        Unpack a partial message from a buffer.
127
128
        This doesn't re-use the "unpack_from" function name from the struct
129
        module because the parameters and return values are not consistent
130
        between this function and the struct module.
131
        """
132 1
        msg = self._tuple._make([None] * len(self._tuple._fields))
133 1
        for elem in self._elements.values():
134 1
            (val, unused) = elem.unpack(msg, buf)
135 1
            buf = unused
136
            # Update the unpacked message with all non-padding elements
137 1
            if elem.name:
138 1
                msg = msg._replace(**dict([(elem.name, val)]))
139 1
        return (msg, buf)
140
141 1
    def unpack(self, buf):
142
        """Unpack the buffer using the initialized format."""
143 1
        (msg, unused) = self.unpack_partial(buf)
144 1
        if unused:
145 1
            error = 'buffer not fully used by unpack: {}'.format(unused)
146 1
            raise ValueError(error)
147 1
        return msg
148
149 1
    def make(self, obj=None, **kwargs):
150
        """
151
        A utility function that returns a namedtuple based on the current
152
        object's format for the supplied object.
153
        """
154 1
        if obj is not None:
155 1
            if isinstance(obj, dict):
156 1
                kwargs = obj
157
            elif isinstance(obj, tuple):
158
                kwargs = obj._asdict()
159 1
        msg = self._tuple._make([None] * len(self._tuple._fields))
160
        # Only attempt to "make" fields that are in the tuple
161 1
        for field in self._tuple._fields:
162 1
            val = self._elements[field].make(kwargs)
163 1
            msg = msg._replace(**dict([(field, val)]))
0 ignored issues
show
Usage of * or ** arguments should usually be done with care.

Generally, there is nothing wrong with usage of * or ** arguments. For readability of the code base, we suggest to not over-use these language constructs though.

For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.

Loading history...
164
165
        # msg.__packed = self.pack(**kwargs)
166 1
        return msg
167
168 1
    def __len__(self):
169 1
        if self._elements == {}:
170 1
            return 0
171
172 1
        size = 0
173 1
        for val in self._elements.values():
174 1
            if isinstance(val.format, (bytes, str)):
175 1
                size += struct.calcsize(val.format)
176 1
            elif isinstance(val.format, (dict, )):
177 1
                lengths = {len(item) for item in val.format.values()}
178 1
                if len(lengths) > 1:
179 1
                    raise AttributeError('Unable to calculate size due to differing size sub items')
180
181 1
                size += sum(lengths)
182
            else:
183
                size += len(val.format)
184
185
        return size
186