sprout42 /
StarStruct
| 1 | """StarStruct element class.""" |
||
| 2 | |||
| 3 | 1 | import struct |
|
| 4 | 1 | import re |
|
| 5 | |||
| 6 | 1 | from starstruct.element import register, Element |
|
| 7 | 1 | from starstruct.modes import Mode |
|
| 8 | |||
| 9 | |||
| 10 | 1 | @register |
|
| 11 | 1 | class ElementString(Element): |
|
| 12 | """ |
||
| 13 | A StarStruct element for strings, because standard string treatment of |
||
| 14 | pack/unpack can be inconvenient. |
||
| 15 | |||
| 16 | This element will encode and decode string type elements from and to forms |
||
| 17 | that are easier to use and manage. |
||
| 18 | """ |
||
| 19 | |||
| 20 | 1 | def __init__(self, field, mode=Mode.Native, alignment=1): |
|
| 21 | """Initialize a StarStruct element object.""" |
||
| 22 | |||
| 23 | # All of the type checks have already been performed by the class |
||
| 24 | # factory |
||
| 25 | 1 | self.name = field[0] |
|
| 26 | |||
| 27 | # The ref attribute is required for all elements, but base element |
||
| 28 | # types don't have one |
||
| 29 | 1 | self.ref = None |
|
| 30 | |||
| 31 | 1 | self._mode = mode |
|
| 32 | 1 | self._alignment = alignment |
|
| 33 | |||
| 34 | # Validate that the format specifiers are valid struct formats, this |
||
| 35 | # doesn't have to be done now because the format will be checked when |
||
| 36 | # any struct functions are called, but it's better to inform the user of |
||
| 37 | # any errors earlier. |
||
| 38 | # The easiest way to perform this check is to create a "Struct" class |
||
| 39 | # instance, this will also increase the efficiency of all struct related |
||
| 40 | # functions called. |
||
| 41 | 1 | self.format = mode.value + field[1] |
|
| 42 | 1 | self._struct = struct.Struct(self.format) |
|
| 43 | |||
| 44 | 1 | @staticmethod |
|
| 45 | def valid(field): |
||
| 46 | """ |
||
| 47 | Validation function to determine if a field tuple represents a valid |
||
| 48 | string element type. |
||
| 49 | """ |
||
| 50 | 1 | return len(field) == 2 \ |
|
| 51 | and isinstance(field[1], str) \ |
||
| 52 | and re.match(r'\d*[csp]', field[1]) |
||
| 53 | |||
| 54 | 1 | def validate(self, msg): |
|
| 55 | """ |
||
| 56 | Ensure that the supplied message contains the required information for |
||
| 57 | this element object to operate. |
||
| 58 | |||
| 59 | The "string" element requires no further validation. |
||
| 60 | """ |
||
| 61 | 1 | pass |
|
| 62 | |||
| 63 | 1 | def update(self, mode=None, alignment=None): |
|
| 64 | """change the mode of the struct format""" |
||
| 65 | 1 | if alignment: |
|
| 66 | 1 | self._alignment = alignment |
|
| 67 | |||
| 68 | 1 | if mode: |
|
| 69 | 1 | self._mode = mode |
|
| 70 | 1 | self.format = mode.value + self.format[1:] |
|
| 71 | # recreate the struct with the new format |
||
| 72 | 1 | self._struct = struct.Struct(self.format) |
|
| 73 | |||
| 74 | 1 | def pack(self, msg): |
|
| 75 | """Pack the provided values into the supplied buffer.""" |
||
| 76 | # Ensure that the input is of the proper form to be packed |
||
| 77 | 1 | val = msg[self.name] |
|
| 78 | 1 | size = struct.calcsize(self.format) |
|
| 79 | 1 | assert len(val) <= size |
|
| 80 | 1 | if self.format[-1] in ('s', 'p'): |
|
| 81 | 1 | if not isinstance(val, bytes): |
|
| 82 | 1 | assert isinstance(val, str) |
|
| 83 | 1 | val = val.encode() |
|
| 84 | 1 | if self.format[-1] == 'p' and len(val) < size: |
|
| 85 | # 'p' (pascal strings) must be the exact size of the format |
||
| 86 | 1 | val += b'\x00' * (size - len(val)) |
|
| 87 | |||
| 88 | 1 | data = self._struct.pack(val) |
|
| 89 | else: # 'c' |
||
| 90 | 1 | if not all(isinstance(c, bytes) for c in val): |
|
| 91 | 1 | if isinstance(val, bytes): |
|
| 92 | val = [bytes([c]) for c in val] |
||
| 93 | else: |
||
| 94 | # last option, it could be a string, or a list of strings |
||
| 95 | 1 | assert (isinstance(val, list) and |
|
| 96 | all(isinstance(c, str) for c in val)) or \ |
||
| 97 | isinstance(val, str) |
||
| 98 | 1 | val = [c.encode() for c in val] |
|
| 99 | 1 | if len(val) < size: |
|
| 100 | 1 | val.extend([b'\x00'] * (size - len(val))) |
|
| 101 | 1 | data = self._struct.pack(*val) |
|
|
0 ignored issues
–
show
|
|||
| 102 | |||
| 103 | # If the data does not meet the alignment, add some padding |
||
| 104 | 1 | missing_bytes = len(data) % self._alignment |
|
| 105 | 1 | if missing_bytes: |
|
| 106 | 1 | data += b'\x00' * (self._alignment - missing_bytes) |
|
| 107 | 1 | return data |
|
| 108 | |||
| 109 | 1 | def unpack(self, msg, buf): |
|
| 110 | """Unpack data from the supplied buffer using the initialized format.""" |
||
| 111 | 1 | ret = self._struct.unpack_from(buf, 0) |
|
| 112 | |||
| 113 | # Remember to remove any alignment-based padding |
||
| 114 | 1 | extra_bytes = self._alignment - 1 - (struct.calcsize(self.format) % |
|
| 115 | self._alignment) |
||
| 116 | 1 | unused = buf[struct.calcsize(self.format) + extra_bytes:] |
|
| 117 | |||
| 118 | 1 | if self.format[-1] in 's': |
|
| 119 | # for 's' formats, convert to a string and strip padding |
||
| 120 | 1 | val = ret[0].decode().strip('\x00') |
|
| 121 | 1 | elif self.format[-1] in 'p': |
|
| 122 | # for 'p' formats, convert to a string, but leave the padding |
||
| 123 | 1 | val = ret[0].decode() |
|
| 124 | else: # 'c' |
||
| 125 | # Just in case we have some ints in the message |
||
| 126 | 1 | val = [c.decode() if not isinstance(c, int) |
|
| 127 | else chr(c) |
||
| 128 | for c in ret] |
||
| 129 | 1 | return (val, unused) |
|
| 130 | |||
| 131 | 1 | def make(self, msg): |
|
| 132 | """Return a string of the expected format""" |
||
| 133 | 1 | val = msg[self.name] |
|
| 134 | 1 | size = struct.calcsize(self.format) |
|
| 135 | 1 | assert len(val) <= size |
|
| 136 | |||
| 137 | # If the supplied value is a list of chars, or a list of bytes, turn |
||
| 138 | # it into a string for ease of processing. |
||
| 139 | 1 | if isinstance(val, list): |
|
| 140 | 1 | if all(isinstance(c, bytes) for c in val): |
|
| 141 | val = ''.join([c.decode() for c in val]) |
||
| 142 | 1 | elif all(isinstance(c, str) for c in val): |
|
| 143 | 1 | val = ''.join([c for c in val]) |
|
| 144 | else: |
||
| 145 | 1 | error = 'Invalid value for string element: {}' |
|
| 146 | 1 | raise TypeError(error.format(val)) |
|
| 147 | 1 | elif isinstance(val, bytes): |
|
| 148 | # If the supplied value is a byes, decode it into a normal string |
||
| 149 | val = val.decode() |
||
| 150 | |||
| 151 | # 'p' (pascal strings) and 'c' (char list) must be the exact size of |
||
| 152 | # the format |
||
| 153 | 1 | if self.format[-1] == 'p' and len(val) < size - 1: |
|
| 154 | 1 | val += '\x00' * (size - len(val) - 1) |
|
| 155 | |||
| 156 | # Lastly, 'c' (char list) formats are expected to be a list of |
||
| 157 | # characters rather than a string. |
||
| 158 | 1 | if self.format[-1] == 'c': |
|
| 159 | 1 | val += '\x00' * (size - len(val)) |
|
| 160 | 1 | val = [c for c in val] |
|
| 161 | |||
| 162 | return val |
||
| 163 |
Generally, there is nothing wrong with usage of
*or**arguments. For readability of the code base, we suggest to not over-use these language constructs though.For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.