Message.__init__()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 14.3828

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 14
c 4
b 0
f 0
dl 0
loc 55
ccs 21
cts 24
cp 0.875
crap 14.3828
rs 3.4604

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Message.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""StarStruct class."""
2
3 1
import collections
4
5 1
import struct
6 1
import starstruct.modes
7 1
from starstruct.element import Element
8 1
from starstruct.startuple import StarTuple
9
10
11
# pylint: disable=line-too-long
12 1
class Message(object):
13
    """An object much like NamedTuple, but with additional formatting."""
14
15
    # pylint: disable=too-many-branches
16 1
    def __init__(self, name, fields, mode=starstruct.modes.Mode.Native, alignment=1):
17
        """
18
        Initialize a StarStruct object.
19
20
        Creates 2 internal items, a format string which is used to call the
21
        struct module functions for packing and unpacking data, and a
22
        namedtuple instance which is used to organize the data provided to the
23
        pack functions and returned from the unpack functions.
24
        """
25
26
        # The name must be a string, this is provided to the
27
        # collections.namedtuple constructor when creating the namedtuple class.
28 1
        if not name or not isinstance(name, str):
29 1
            raise TypeError('invalid name: {}'.format(name))
30
31 1
        self._name = name
32 1
        self.mode = mode
33 1
        self.alignment = alignment
34
35
        # The structure definition must be a list of
36
        #   ('name', 'format', <optional>)
37
        # tuples
38 1
        if not isinstance(fields, list) \
39
                or not all(isinstance(x, tuple) for x in fields):
40
            raise TypeError('invalid fields: {}'.format(fields))
41
42 1
        if not isinstance(mode, starstruct.modes.Mode):
43 1
            raise TypeError('invalid mode: {}'.format(mode))
44
45
        # Create an ordered dictionary (so element order is preserved) out of
46
        # the individual message fields.  Ensure that there are no duplicate
47
        # field names.
48 1
        self._elements = collections.OrderedDict()
49 1
        for field in fields:
50 1
            if field[0] not in self._elements:
51 1
                if isinstance(field[0], str):
52 1
                    self._elements[field[0]] = Element.factory(field, mode, alignment)
53 1
                elif isinstance(field[0], bytes):
54 1
                    self._elements[field[0].decode('utf-8')] = Element.factory(field, mode, alignment)
55
                else:
56
                    raise NotImplementedError
57
            else:
58
                raise TypeError('duplicate field {} in {}'.format(field[0], fields))
59
60
        # Validate all of the elements of this message
61 1
        for elem in self._elements.values():
62 1
            elem.validate(self._elements)
63
64
            # Give each element information about the other elements
65 1
            elem._elements = self._elements
66
67
        # Now that the format has been validated, create a named tuple with the
68
        # correct fields.
69 1
        named_fields = [elem.name for elem in self._elements.values() if elem.name]
70 1
        self._tuple = StarTuple(self._name, named_fields, self._elements)
71
72 1
    def update(self, mode=None, alignment=None):
73
        """ Change the mode of a message. """
74 1
        if mode and not isinstance(mode, starstruct.modes.Mode):
75
            raise TypeError('invalid mode: {}'.format(mode))
76
77
        # Change the mode for all elements
78 1
        for key in self._elements.keys():
79 1
            self._elements[key].update(mode, alignment)
80
81 1
    def is_unpacked(self, other):
82
        """
83
        Provide a function that allows checking if an unpacked message tuple
84
        is an instance of what could be unpacked from a particular message
85
        object.
86
        """
87
        # First check to see if the passed in object is a namedtuple
88
        # that matches this message type
89
        if not isinstance(other, self._tuple):
90
            return False
91
92
        # Then check any element values that may be another message type to
93
        # ensure that the sub-elements are valid types.
94
        for key in self._elements.keys():
95
            if hasattr(self._elements[key].format, 'is_unpacked'):
96
                # If the format for an element is Message object (that has the
97
                # is_unpacked() function defined), call the is_unpacked()
98
                # function.
99
                msg = self._elements[key].format
100
                if not msg.is_unpacked(getattr(other, key)):
101
                    return False
102
            if hasattr(self._elements[key].format, 'keys'):
103
                # If the format for an element is a dictionary, attempt to
104
                # extract the correct item with the assumption that the ref
105
                # attribute identifies the discriminator
106
107
                # Select the correct message object based on the value of the
108
                # referenced item
109
                ref_val = getattr(other, self._elements[key].ref)
110
                if ref_val not in self._elements[key].format.keys():
111
                    return False
112
                msg = self._elements[key].format[ref_val]
113
                if not msg.is_unpacked(getattr(other, key)):
114
                    return False
115
        return True
116
117 1
    def pack(self, obj=None, **kwargs):
118
        """Pack the provided values using the initialized format."""
119
        # Handle a positional dictionary argument as well as the more generic kwargs
120 1
        if obj and isinstance(obj, dict):
121 1
            kwargs = obj
122 1
        return b''.join(elem.pack(kwargs) for elem in self._elements.values())
123
124 1
    def unpack_partial(self, buf):
125
        """
126
        Unpack a partial message from a buffer.
127
128
        This doesn't re-use the "unpack_from" function name from the struct
129
        module because the parameters and return values are not consistent
130
        between this function and the struct module.
131
        """
132 1
        msg = self._tuple._make([None] * len(self._tuple._fields))
133 1
        for elem in self._elements.values():
134 1
            (val, unused) = elem.unpack(msg, buf)
135 1
            buf = unused
136
            # Update the unpacked message with all non-padding elements
137 1
            if elem.name:
138 1
                msg = msg._replace(**dict([(elem.name, val)]))
139 1
        return (msg, buf)
140
141 1
    def unpack(self, buf):
142
        """Unpack the buffer using the initialized format."""
143 1
        (msg, unused) = self.unpack_partial(buf)
144 1
        if unused:
145 1
            error = 'buffer not fully used by unpack: {}'.format(unused)
146 1
            raise ValueError(error)
147 1
        return msg
148
149 1
    def make(self, obj=None, **kwargs):
150
        """
151
        A utility function that returns a namedtuple based on the current
152
        object's format for the supplied object.
153
        """
154 1
        if obj is not None:
155 1
            if isinstance(obj, dict):
156 1
                kwargs = obj
157
            elif isinstance(obj, tuple):
158
                kwargs = obj._asdict()
159 1
        msg = self._tuple._make([None] * len(self._tuple._fields))
160
        # Only attempt to "make" fields that are in the tuple
161 1
        for field in self._tuple._fields:
162 1
            val = self._elements[field].make(kwargs)
163 1
            msg = msg._replace(**dict([(field, val)]))
0 ignored issues
show
Coding Style introduced by
Usage of * or ** arguments should usually be done with care.

Generally, there is nothing wrong with usage of * or ** arguments. For readability of the code base, we suggest to not over-use these language constructs though.

For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.

Loading history...
164
165
        # msg.__packed = self.pack(**kwargs)
166 1
        return msg
167
168 1
    def __len__(self):
169 1
        if self._elements == {}:
170 1
            return 0
171
172 1
        size = 0
173 1
        for val in self._elements.values():
174 1
            if isinstance(val.format, (bytes, str)):
175 1
                size += struct.calcsize(val.format)
176 1
            elif isinstance(val.format, (dict, )):
177 1
                lengths = {len(item) for item in val.format.values()}
178 1
                if len(lengths) > 1:
179 1
                    raise AttributeError('Unable to calculate size due to differing size sub items')
180
181 1
                size += sum(lengths)
182
            else:
183
                size += len(val.format)
184
185
        return size
186