Completed
Push — master ( 8b3414...5729ab )
by Ryan
01:13
created

NamedStruct.make_tuple()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 1
b 0
f 0
1
'A collection of general purpose tools for reading files'
2
3
# Copyright (c) 2008-2015 MetPy Developers.
4
# Distributed under the terms of the BSD 3-Clause License.
5
# SPDX-License-Identifier: BSD-3-Clause
6
7
from __future__ import print_function
8
import logging
9
import zlib
10
from collections import namedtuple
11
from struct import Struct
12
13
from ..units import UndefinedUnitError, units
14
15
log = logging.getLogger('metpy.io.tools')
16
log.setLevel(logging.WARNING)
17
18
19
# This works around problems on early Python 2.7 where Struct.unpack_from() can't handle
20
# being given a bytearray; use memoryview on Python 3, since calling bytearray again isn't
21
# cheap.
22
try:
23
    bytearray_to_buff = buffer
24
except NameError:
25
    bytearray_to_buff = memoryview
26
27
28
class UnitLinker(object):
29
    r'''Wraps a :class:`metpy.io.cdm.Variable` and converts any attached unit attribute
30
    to a class:`pint.Unit`. It also handles converting data returns to be instances
31
    of class:`pint.Quantity` rather than bare (unit-less) arrays.
32
    '''
33
    def __init__(self, var):
34
        r'''Construct a new :class:`UnitLinker`.
35
36
        Parameters
37
        ----------
38
        var : Variable
39
            The :class:`metpy.io.cdm.Variable` to be wrapped.
40
        '''
41
        self._var = var
42
        try:
43
            self._unit = units(self._var.units)
44
        except (AttributeError, UndefinedUnitError):
45
            self._unit = None
46
47
    def __getitem__(self, ind):
48
        r'Get data from the underlying variable and add units'
49
        ret = self._var[ind]
50
        return ret if self._unit is None else ret * self._unit
51
52
    def __getattr__(self, item):
53
        r'Forward all attribute access onto underlying variable'
54
        return getattr(self._var, item)
55
56
    @property
57
    def units(self):
58
        r'Access the units from the underlying variable as a :class:`pint.Quantity`'
59
        return self._unit
60
61
    @units.setter
62
    def units(self, val):
63
        r'Override the units on the underlying variable'
64
        if isinstance(val, units.Unit):
65
            self._unit = val
66
        else:
67
            self._unit = units(val)
68
69
70
class NamedStruct(Struct):
71
    def __init__(self, info, prefmt='', tuple_name=None):
72
        if tuple_name is None:
73
            tuple_name = 'NamedStruct'
74
        names, fmts = zip(*info)
75
        self.converters = {}
76
        conv_off = 0
77
        for ind, i in enumerate(info):
78
            if len(i) > 2:
79
                self.converters[ind - conv_off] = i[-1]
80
            elif not i[0]:  # Skip items with no name
81
                conv_off += 1
82
        self._tuple = namedtuple(tuple_name, ' '.join(n for n in names if n))
83
        super(NamedStruct, self).__init__(prefmt + ''.join(f for f in fmts if f))
84
85
    def _create(self, items):
86
        if self.converters:
87
            items = list(items)
88
            for ind, conv in self.converters.items():
89
                items[ind] = conv(items[ind])
90
            if len(items) < len(self._tuple._fields):
91
                items.extend([None] * (len(self._tuple._fields) - len(items)))
92
        return self.make_tuple(*items)
93
94
    def make_tuple(self, *args, **kwargs):
95
        'Construct the underlying tuple from values'
96
        return self._tuple(*args, **kwargs)
97
98
    def unpack(self, s):
99
        return self._create(super(NamedStruct, self).unpack(s))
100
101
    def unpack_from(self, buff, offset=0):
102
        return self._create(super(NamedStruct, self).unpack_from(buff, offset))
103
104
    def unpack_file(self, fobj):
105
        return self.unpack(fobj.read(self.size))
106
107
108
# This works around times when we have more than 255 items and can't use
109
# NamedStruct. This is a CPython limit for arguments.
110
class DictStruct(Struct):
111
    def __init__(self, info, prefmt=''):
112
        names, formats = zip(*info)
113
114
        # Remove empty names
115
        self._names = [n for n in names if n]
116
117
        super(DictStruct, self).__init__(prefmt + ''.join(f for f in formats if f))
118
119
    def _create(self, items):
120
        return dict(zip(self._names, items))
121
122
    def unpack(self, s):
123
        return self._create(super(DictStruct, self).unpack(s))
124
125
    def unpack_from(self, buff, offset=0):
126
        return self._create(super(DictStruct, self).unpack_from(buff, offset))
127
128
129
class Enum(object):
130
    def __init__(self, *args, **kwargs):
131
        self.val_map = dict()
132
        # Assign values for args in order starting at 0
133
        for ind, a in enumerate(args):
134
            self.val_map[ind] = a
135
136
        # Invert the kwargs dict so that we can map from value to name
137
        for k in kwargs:
138
            self.val_map[kwargs[k]] = k
139
140
    def __call__(self, val):
141
        return self.val_map.get(val, 'Unknown ({})'.format(val))
142
143
144
class Bits(object):
145
    def __init__(self, num_bits):
146
        self._bits = range(num_bits)
147
148
    def __call__(self, val):
149
        return [bool((val >> i) & 0x1) for i in self._bits]
150
151
152
class BitField(object):
153
    def __init__(self, *names):
154
        self._names = names
155
156
    def __call__(self, val):
157
        if not val:
158
            return None
159
160
        l = []
161
        for n in self._names:
162
            if val & 0x1:
163
                l.append(n)
164
            val >>= 1
165
            if not val:
166
                break
167
168
        # Return whole list if empty or multiple items, otherwise just single item
169
        return l[0] if len(l) == 1 else l
170
171
172
class Array(object):
173
    def __init__(self, fmt):
174
        self._struct = Struct(fmt)
175
176
    def __call__(self, buf):
177
        return list(self._struct.unpack(buf))
178
179
180
class IOBuffer(object):
181
    def __init__(self, source):
182
        self._data = bytearray(source)
183
        self._offset = 0
184
        self.clear_marks()
185
186
    @classmethod
187
    def fromfile(cls, fobj):
188
        return cls(fobj.read())
189
190
    def set_mark(self):
191
        self._bookmarks.append(self._offset)
192
        return len(self._bookmarks) - 1
193
194
    def jump_to(self, mark, offset=0):
195
        self._offset = self._bookmarks[mark] + offset
196
197
    def offset_from(self, mark):
198
        return self._offset - self._bookmarks[mark]
199
200
    def clear_marks(self):
201
        self._bookmarks = []
202
203
    def splice(self, mark, newdata):
204
        self.jump_to(mark)
205
        self._data = self._data[:self._offset] + bytearray(newdata)
206
207
    def read_struct(self, struct_class):
208
        struct = struct_class.unpack_from(bytearray_to_buff(self._data), self._offset)
209
        self.skip(struct_class.size)
210
        return struct
211
212
    def read_func(self, func, num_bytes=None):
213
        # only advance if func succeeds
214
        res = func(self.get_next(num_bytes))
215
        self.skip(num_bytes)
216
        return res
217
218
    def read_ascii(self, num_bytes=None):
219
        return self.read(num_bytes).decode('ascii')
220
221
    def read_binary(self, num, item_type='B'):
222
        if 'B' in item_type:
223
            return self.read(num)
224
225
        if item_type[0] in ('@', '=', '<', '>', '!'):
226
            order = item_type[0]
227
            item_type = item_type[1:]
228
        else:
229
            order = '@'
230
231
        return list(self.read_struct(Struct(order + '%d' % num + item_type)))
232
233
    def read_int(self, code):
234
        return self.read_struct(Struct(code))[0]
235
236
    def read(self, num_bytes=None):
237
        res = self.get_next(num_bytes)
238
        self.skip(len(res))
239
        return res
240
241
    def get_next(self, num_bytes=None):
242
        if num_bytes is None:
243
            return self._data[self._offset:]
244
        else:
245
            return self._data[self._offset:self._offset + num_bytes]
246
247
    def skip(self, num_bytes):
248
        if num_bytes is None:
249
            self._offset = len(self._data)
250
        else:
251
            self._offset += num_bytes
252
253
    def check_remains(self, num_bytes):
254
        return len(self._data[self._offset:]) == num_bytes
255
256
    def truncate(self, num_bytes):
257
        self._data = self._data[:-num_bytes]
258
259
    def at_end(self):
260
        return self._offset >= len(self._data)
261
262
    def __getitem__(self, item):
263
        return self._data[item]
264
265
    def __str__(self):
266
        return 'Size: {} Offset: {}'.format(len(self._data), self._offset)
267
268
    def print_next(self, num_bytes):
269
        print(' '.join('%02x' % c for c in self.get_next(num_bytes)))
270
271
    def __len__(self):
272
        return len(self._data)
273
274
275
def zlib_decompress_all_frames(data):
276
    """Decompress all frames of zlib-compressed bytes.
277
278
    Repeatedly tries to decompress `data` until all data are decompressed, or decompression
279
    fails. This will skip over bytes that are not compressed with zlib.
280
281
    Parameters
282
    ----------
283
    data : bytearray or bytes
284
        Binary data compressed using zlib.
285
286
    Returns
287
    -------
288
        bytearray
289
            All decompressed bytes
290
    """
291
    frames = bytearray()
292
    data = bytes(data)
293
    while data:
294
        decomp = zlib.decompressobj()
295
        try:
296
            frames.extend(decomp.decompress(data))
297
            data = decomp.unused_data
298
        except zlib.error:
299
            frames.extend(data)
300
            break
301
    return frames
302
303
304
def bits_to_code(val):
305
    if val == 8:
306
        return 'B'
307
    elif val == 16:
308
        return 'H'
309
    else:
310
        log.warning('Unsupported bit size: %s. Returning "B"', val)
311
        return 'B'
312
313
314
# For debugging
315
def hexdump(buf, num_bytes, offset=0, width=32):
316
    ind = offset
317
    end = offset + num_bytes
318
    while ind < end:
319
        chunk = buf[ind:ind + width]
320
        actual_width = len(chunk)
321
        hexfmt = '%02X'
322
        blocksize = 4
323
        blocks = [hexfmt * blocksize for _ in range(actual_width // blocksize)]
324
325
        # Need to get any partial lines
326
        num_left = actual_width % blocksize
327
        if num_left:
328
            blocks += [hexfmt * num_left + '--' * (blocksize - num_left)]
329
        blocks += ['--' * blocksize] * (width // blocksize - len(blocks))
330
331
        hexoutput = ' '.join(blocks)
332
        printable = tuple(chunk)
333
        print(hexoutput % printable, str(ind).ljust(len(str(end))),
334
              str(ind - offset).ljust(len(str(end))),
335
              ''.join(chr(c) if 31 < c < 128 else '.' for c in chunk), sep='  ')
336
        ind += width
337