sprout42 /
StarStruct
| 1 | """StarStruct fixedpoint element class.""" |
||
| 2 | # pylint: disable=line-too-long |
||
| 3 | |||
| 4 | 1 | import re |
|
| 5 | 1 | import struct |
|
| 6 | |||
| 7 | 1 | import decimal |
|
| 8 | 1 | from decimal import Decimal |
|
| 9 | |||
| 10 | 1 | from starstruct.element import register, Element |
|
| 11 | 1 | from starstruct.modes import Mode |
|
| 12 | |||
| 13 | |||
| 14 | # TODO: I think we could probably just do most of this with struct.calcsize |
||
| 15 | 1 | BITS_FORMAT = { |
|
| 16 | ('c', 'b', 'B'): 1, |
||
| 17 | ('h', 'H'): 2, |
||
| 18 | ('i', 'I', 'l', 'L'): 4, |
||
| 19 | ('q', 'Q'): 8 |
||
| 20 | } |
||
| 21 | |||
| 22 | |||
| 23 | 1 | def get_bits_length(pack_format): |
|
| 24 | """ |
||
| 25 | Helper function to return the number of bits for the format |
||
| 26 | """ |
||
| 27 | 1 | if pack_format[0] in ['@', '=', '<', '>', '+']: |
|
| 28 | 1 | pack_format = pack_format[1:] |
|
| 29 | |||
| 30 | 1 | bits = -1 |
|
| 31 | 1 | for fmt in BITS_FORMAT: |
|
| 32 | 1 | match_str = r'|'.join(fmt) |
|
| 33 | # match_str = r'(@|=|<|>|+)' + match_str |
||
| 34 | # match_str = r'*' + match_str + r'*' |
||
| 35 | |||
| 36 | 1 | if re.match(match_str, pack_format): |
|
| 37 | 1 | bits = BITS_FORMAT[fmt] * 8 |
|
| 38 | |||
| 39 | 1 | if bits == -1: |
|
| 40 | 1 | err = 'Pack format {0} was not a valid fixed point specifier' |
|
| 41 | 1 | raise ValueError(err.format(pack_format)) |
|
| 42 | |||
| 43 | 1 | return bits |
|
| 44 | |||
| 45 | |||
| 46 | 1 | def get_fixed_point(num, pack_format, precision): |
|
| 47 | """ |
||
| 48 | Helper function to get the right bytes once we're done |
||
| 49 | """ |
||
| 50 | 1 | if not isinstance(num, Decimal): |
|
| 51 | 1 | try: |
|
| 52 | 1 | num = Decimal(num) |
|
| 53 | 1 | except: |
|
| 54 | 1 | raise ValueError('Num {0} could not be converted to a Decimal'.format(num)) |
|
| 55 | |||
| 56 | 1 | bits = get_bits_length(pack_format) |
|
| 57 | |||
| 58 | 1 | if bits < precision: |
|
| 59 | 1 | raise ValueError('Format {1} too small for the given precision of {0}'.format(pack_format, precision)) |
|
| 60 | |||
| 61 | 1 | if num >= 2 ** (bits - precision): |
|
| 62 | 1 | raise ValueError('num: {0} must fit in the specified number of available bits {1}'.format(num, 8 * (bits - precision))) |
|
| 63 | |||
| 64 | 1 | num_shifted = int(num * (2 ** precision)) |
|
| 65 | 1 | return num_shifted |
|
| 66 | |||
| 67 | |||
| 68 | 1 | def get_fixed_bits(num, pack_format, precision): |
|
| 69 | """ |
||
| 70 | Helper function to get the integer portion of a fixed point value |
||
| 71 | """ |
||
| 72 | 1 | num_shifted = get_fixed_point(num, pack_format, precision) |
|
| 73 | 1 | return struct.pack(pack_format, num_shifted) |
|
| 74 | |||
| 75 | |||
| 76 | 1 | @register |
|
| 77 | 1 | class ElementFixedPoint(Element): |
|
| 78 | """ |
||
| 79 | A StarStruct element class for fixed point number fields. |
||
| 80 | |||
| 81 | Uses the built in Decimal class |
||
| 82 | |||
| 83 | Example Usage:: |
||
| 84 | |||
| 85 | from starstruct.message import Message |
||
| 86 | example_precision = 8 |
||
| 87 | example_struct = starstruct.Message('example', [('my_fixed_point', 'F', 'I', example_precision)]) |
||
| 88 | |||
| 89 | my_data = { |
||
| 90 | 'my_fixed_point': '120.0' |
||
| 91 | } |
||
| 92 | packed_struct = example_struct.make(my_data) |
||
| 93 | |||
| 94 | """ |
||
| 95 | View Code Duplication | ||
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 96 | 1 | def __init__(self, field, mode=Mode.Native, alignment=1): |
|
| 97 | """Initialize a StarStruct element object.""" |
||
| 98 | |||
| 99 | # TODO: Add checks in the class factory? |
||
| 100 | 1 | self.name = field[0] |
|
| 101 | |||
| 102 | # the ref attribute is required, but this element doesn't quite have |
||
| 103 | # one, instead use the ref field to hold the fixed point format |
||
| 104 | # attributes |
||
| 105 | 1 | self.ref = {} |
|
| 106 | |||
| 107 | 1 | self.ref['precision'] = field[3] |
|
| 108 | |||
| 109 | 1 | if len(field) == 5: |
|
| 110 | 1 | self.ref['decimal_prec'] = field[4] |
|
| 111 | else: |
||
| 112 | 1 | self.ref['decimal_prec'] = None |
|
| 113 | |||
| 114 | 1 | self._mode = mode |
|
| 115 | 1 | self._alignment = alignment |
|
| 116 | |||
| 117 | 1 | self.format = mode.value + field[2] |
|
| 118 | 1 | self._struct = struct.Struct(self.format) |
|
| 119 | |||
| 120 | 1 | @staticmethod |
|
| 121 | def valid(field): |
||
| 122 | """ |
||
| 123 | Validation function to determine if a field tuple represents a valid |
||
| 124 | fixedpoint element type. |
||
| 125 | |||
| 126 | The basics have already been validated by the Element factory class, |
||
| 127 | validate the specific struct format now. |
||
| 128 | """ |
||
| 129 | 1 | return len(field) >= 4 \ |
|
| 130 | and isinstance(field[1], str) \ |
||
| 131 | and re.match(r'\d*F', field[1]) \ |
||
| 132 | and isinstance(field[2], str) \ |
||
| 133 | and isinstance(field[3], (int, float, Decimal)) |
||
| 134 | |||
| 135 | 1 | def validate(self, msg): |
|
| 136 | """ |
||
| 137 | Ensure that the supplied message contains the required information for |
||
| 138 | this element object to operate. |
||
| 139 | |||
| 140 | The "fixedpoint" element requires no further validation. |
||
| 141 | """ |
||
| 142 | 1 | pass |
|
| 143 | |||
| 144 | 1 | def update(self, mode=None, alignment=None): |
|
| 145 | """change the mode of the struct format""" |
||
| 146 | if alignment: |
||
| 147 | self._alignment = alignment |
||
| 148 | |||
| 149 | if mode: |
||
| 150 | self._mode = mode |
||
| 151 | self.format = mode.value + self.format[1:] |
||
| 152 | # recreate the struct with the new format |
||
| 153 | self._struct = struct.Struct(self.format) |
||
| 154 | |||
| 155 | 1 | def pack(self, msg): |
|
| 156 | """Pack the provided values into the specified buffer.""" |
||
| 157 | 1 | packing_decimal = Decimal(msg[self.name]) |
|
| 158 | |||
| 159 | # integer = int(self.decimal // 1) |
||
| 160 | # top_bits = integer.to_bytes(int((self.bits - self.precision) / 8), self._mode.to_byteorder()) |
||
| 161 | # top_bits = b'{0:%db}' % (self.bits - self.precision) |
||
| 162 | # top_bits = top_bits.format(integer) |
||
| 163 | |||
| 164 | # bot_bits = b'0' * self.precision |
||
| 165 | |||
| 166 | # print('top_bits:', top_bits.bin) |
||
| 167 | # print('bot_bits:', bot_bits) |
||
| 168 | # print('all_bits:', top_bits + bot_bits) |
||
| 169 | # self._struct.pack(top_bits + bot_bits) |
||
| 170 | 1 | fixed_point = get_fixed_point(packing_decimal, self.format, self.ref['precision']) |
|
| 171 | 1 | data = self._struct.pack(fixed_point) |
|
| 172 | |||
| 173 | # If the data does not meet the alignment, add some padding |
||
| 174 | 1 | missing_bytes = len(data) % self._alignment |
|
| 175 | 1 | if missing_bytes: |
|
| 176 | data += b'\x00' * missing_bytes |
||
| 177 | 1 | return data |
|
| 178 | |||
| 179 | 1 | def unpack(self, msg, buf): |
|
| 180 | """Unpack data from the supplied buffer using the initialized format.""" |
||
| 181 | # ret = self._struct.unpack_from(buf, 0) |
||
| 182 | 1 | ret = self._struct.unpack_from(buf, 0)[0] |
|
| 183 | |||
| 184 | # Remember to remove any alignment-based padding |
||
| 185 | 1 | extra_bytes = self._alignment - 1 - (struct.calcsize(self.format) % |
|
| 186 | self._alignment) |
||
| 187 | 1 | unused = buf[struct.calcsize(self.format) + extra_bytes:] |
|
| 188 | |||
| 189 | 1 | if self.ref['decimal_prec']: |
|
| 190 | 1 | decimal.getcontext().prec = self.ref['decimal_prec'] |
|
| 191 | else: |
||
| 192 | 1 | decimal.getcontext().prec = 26 |
|
| 193 | |||
| 194 | 1 | ret_decimal = Decimal(ret) / Decimal(2 ** self.ref['precision']) |
|
| 195 | 1 | return (ret_decimal, unused) |
|
| 196 | |||
| 197 | 1 | def make(self, msg): |
|
| 198 | """Return bytes of the expected format""" |
||
| 199 | # return self._struct.pack(msg[self.name]) |
||
| 200 | return msg[self.name] |
||
| 201 |