Completed
Pull Request — master (#348)
by
unknown
01:22
created

interpolate_nans()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
# Copyright (c) 2008-2015 MetPy Developers.
2
# Distributed under the terms of the BSD 3-Clause License.
3
# SPDX-License-Identifier: BSD-3-Clause
4
"""A collection of general purpose tools for reading files."""
5
6
from __future__ import print_function
7
8
from collections import namedtuple
9
import logging
10
import numpy as np
11
from struct import Struct
12
import zlib
13
14
from ..units import UndefinedUnitError, units
15
16
log = logging.getLogger(__name__)
17
log.setLevel(logging.WARNING)
18
19
20
# This works around problems on early Python 2.7 where Struct.unpack_from() can't handle
21
# being given a bytearray; use memoryview on Python 3, since calling bytearray again isn't
22
# cheap.
23
try:
24
    bytearray_to_buff = buffer
25
except NameError:
26
    bytearray_to_buff = memoryview
27
28
29
class UnitLinker(object):
30
    r"""Wrap a :class:`metpy.io.cdm.Variable` and handle units.
31
32
    Converts any attached unit attribute to a class:`pint.Unit`. It also handles converting
33
    data returns to be instances of class:`pint.Quantity` rather than bare (unit-less) arrays.
34
    """
35
36
    def __init__(self, var):
37
        r"""Construct a new :class:`UnitLinker`.
38
39
        Parameters
40
        ----------
41
        var : Variable
42
            The :class:`metpy.io.cdm.Variable` to be wrapped.
43
        """
44
        self._var = var
45
        try:
46
            self._unit = units(self._var.units)
47
        except (AttributeError, UndefinedUnitError):
48
            self._unit = None
49
50
    def __getitem__(self, ind):
51
        """Get data from the underlying variable and add units."""
52
        ret = self._var[ind]
53
        return ret if self._unit is None else ret * self._unit
54
55
    def __getattr__(self, item):
56
        """Forward all attribute access onto underlying variable."""
57
        return getattr(self._var, item)
58
59
    @property
60
    def units(self):
61
        """Access the units from the underlying variable as a :class:`pint.Quantity`."""
62
        return self._unit
63
64
    @units.setter
65
    def units(self, val):
66
        """Override the units on the underlying variable."""
67
        if isinstance(val, units.Unit):
68
            self._unit = val
69
        else:
70
            self._unit = units(val)
71
72
73
class NamedStruct(Struct):
74
    """Parse bytes using :class:`Struct` but provide named fields."""
75
76
    def __init__(self, info, prefmt='', tuple_name=None):
77
        """Initialize the NamedStruct."""
78
        if tuple_name is None:
79
            tuple_name = 'NamedStruct'
80
        names, fmts = zip(*info)
81
        self.converters = {}
82
        conv_off = 0
83
        for ind, i in enumerate(info):
84
            if len(i) > 2:
85
                self.converters[ind - conv_off] = i[-1]
86
            elif not i[0]:  # Skip items with no name
87
                conv_off += 1
88
        self._tuple = namedtuple(tuple_name, ' '.join(n for n in names if n))
89
        super(NamedStruct, self).__init__(prefmt + ''.join(f for f in fmts if f))
90
91
    def _create(self, items):
92
        if self.converters:
93
            items = list(items)
94
            for ind, conv in self.converters.items():
95
                items[ind] = conv(items[ind])
96
            if len(items) < len(self._tuple._fields):
97
                items.extend([None] * (len(self._tuple._fields) - len(items)))
98
        return self.make_tuple(*items)
99
100
    def make_tuple(self, *args, **kwargs):
101
        """Construct the underlying tuple from values."""
102
        return self._tuple(*args, **kwargs)
103
104
    def unpack(self, s):
105
        """Parse bytes and return a namedtuple."""
106
        return self._create(super(NamedStruct, self).unpack(s))
107
108
    def unpack_from(self, buff, offset=0):
109
        """Read bytes from a buffer and return as a namedtuple."""
110
        return self._create(super(NamedStruct, self).unpack_from(buff, offset))
111
112
    def unpack_file(self, fobj):
113
        """Unpack the next bytes from a file object."""
114
        return self.unpack(fobj.read(self.size))
115
116
117
# This works around times when we have more than 255 items and can't use
118
# NamedStruct. This is a CPython limit for arguments.
119
class DictStruct(Struct):
120
    """Parse bytes using :class:`Struct` but provide named fields using dictionary access."""
121
122
    def __init__(self, info, prefmt=''):
123
        """Initialize the DictStruct."""
124
        names, formats = zip(*info)
125
126
        # Remove empty names
127
        self._names = [n for n in names if n]
128
129
        super(DictStruct, self).__init__(prefmt + ''.join(f for f in formats if f))
130
131
    def _create(self, items):
132
        return dict(zip(self._names, items))
133
134
    def unpack(self, s):
135
        """Parse bytes and return a namedtuple."""
136
        return self._create(super(DictStruct, self).unpack(s))
137
138
    def unpack_from(self, buff, offset=0):
139
        """Unpack the next bytes from a file object."""
140
        return self._create(super(DictStruct, self).unpack_from(buff, offset))
141
142
143
class Enum(object):
144
    """Map values to specific strings."""
145
146
    def __init__(self, *args, **kwargs):
147
        """Initialize the mapping."""
148
        self.val_map = dict()
149
        # Assign values for args in order starting at 0
150
        for ind, a in enumerate(args):
151
            self.val_map[ind] = a
152
153
        # Invert the kwargs dict so that we can map from value to name
154
        for k in kwargs:
155
            self.val_map[kwargs[k]] = k
156
157
    def __call__(self, val):
158
        """Map an integer to the string representation."""
159
        return self.val_map.get(val, 'Unknown ({})'.format(val))
160
161
162
class Bits(object):
163
    """Breaks an integer into a specified number of True/False bits."""
164
165
    def __init__(self, num_bits):
166
        """Initialize the number of bits."""
167
        self._bits = range(num_bits)
168
169
    def __call__(self, val):
170
        """Convert the integer to the list of True/False values."""
171
        return [bool((val >> i) & 0x1) for i in self._bits]
172
173
174
class BitField(object):
175
    """Convert an integer to a string for each bit."""
176
177
    def __init__(self, *names):
178
        """Initialize the list of named bits."""
179
        self._names = names
180
181
    def __call__(self, val):
182
        """Return a list with a string for each True bit in the integer."""
183
        if not val:
184
            return None
185
186
        l = []
187
        for n in self._names:
188
            if val & 0x1:
189
                l.append(n)
190
            val >>= 1
191
            if not val:
192
                break
193
194
        # Return whole list if empty or multiple items, otherwise just single item
195
        return l[0] if len(l) == 1 else l
196
197
198
class Array(object):
199
    """Use a Struct as a callable to unpack a bunch of bytes as a list."""
200
201
    def __init__(self, fmt):
202
        """Initialize the Struct unpacker."""
203
        self._struct = Struct(fmt)
204
205
    def __call__(self, buf):
206
        """Perform the actual unpacking."""
207
        return list(self._struct.unpack(buf))
208
209
210
class IOBuffer(object):
211
    """Holds bytes from a buffer to simplify parsing and random access."""
212
213
    def __init__(self, source):
214
        """Initialize the IOBuffer with the source data."""
215
        self._data = bytearray(source)
216
        self._offset = 0
217
        self.clear_marks()
218
219
    @classmethod
220
    def fromfile(cls, fobj):
221
        """Initialize the IOBuffer with the contents of the file object."""
222
        return cls(fobj.read())
223
224
    def set_mark(self):
225
        """Mark the current location and return its id so that the buffer can return later."""
226
        self._bookmarks.append(self._offset)
227
        return len(self._bookmarks) - 1
228
229
    def jump_to(self, mark, offset=0):
230
        """Jump to a previously set mark."""
231
        self._offset = self._bookmarks[mark] + offset
232
233
    def offset_from(self, mark):
234
        """Calculate the current offset relative to a marked location."""
235
        return self._offset - self._bookmarks[mark]
236
237
    def clear_marks(self):
238
        """Clear all marked locations."""
239
        self._bookmarks = []
240
241
    def splice(self, mark, newdata):
242
        """Replace the data after the marked location with the specified data."""
243
        self.jump_to(mark)
244
        self._data = self._data[:self._offset] + bytearray(newdata)
245
246
    def read_struct(self, struct_class):
247
        """Parse and return a structure from the current buffer offset."""
248
        struct = struct_class.unpack_from(bytearray_to_buff(self._data), self._offset)
249
        self.skip(struct_class.size)
250
        return struct
251
252
    def read_func(self, func, num_bytes=None):
253
        """Parse data from the current buffer offset using a function."""
254
        # only advance if func succeeds
255
        res = func(self.get_next(num_bytes))
256
        self.skip(num_bytes)
257
        return res
258
259
    def read_ascii(self, num_bytes=None):
260
        """Return the specified bytes as ascii-formatted text."""
261
        return self.read(num_bytes).decode('ascii')
262
263
    def read_binary(self, num, item_type='B'):
264
        """Parse the current buffer offset as the specified code."""
265
        if 'B' in item_type:
266
            return self.read(num)
267
268
        if item_type[0] in ('@', '=', '<', '>', '!'):
269
            order = item_type[0]
270
            item_type = item_type[1:]
271
        else:
272
            order = '@'
273
274
        return list(self.read_struct(Struct(order + '{:d}'.format(int(num)) + item_type)))
275
276
    def read_int(self, code):
277
        """Parse the current buffer offset as the specified integer code."""
278
        return self.read_struct(Struct(code))[0]
279
280
    def read(self, num_bytes=None):
281
        """Read and return the specified bytes from the buffer."""
282
        res = self.get_next(num_bytes)
283
        self.skip(len(res))
284
        return res
285
286
    def get_next(self, num_bytes=None):
287
        """Get the next bytes in the buffer without modifying the offset."""
288
        if num_bytes is None:
289
            return self._data[self._offset:]
290
        else:
291
            return self._data[self._offset:self._offset + num_bytes]
292
293
    def skip(self, num_bytes):
294
        """Jump the ahead the specified bytes in the buffer."""
295
        if num_bytes is None:
296
            self._offset = len(self._data)
297
        else:
298
            self._offset += num_bytes
299
300
    def check_remains(self, num_bytes):
301
        """Check that the number of bytes specified remains in the buffer."""
302
        return len(self._data[self._offset:]) == num_bytes
303
304
    def truncate(self, num_bytes):
305
        """Remove the specified number of bytes from the end of the buffer."""
306
        self._data = self._data[:-num_bytes]
307
308
    def at_end(self):
309
        """Return whether the buffer has reached the end of data."""
310
        return self._offset >= len(self._data)
311
312
    def __getitem__(self, item):
313
        """Return the data at the specified location."""
314
        return self._data[item]
315
316
    def __str__(self):
317
        """Return a string representation of the IOBuffer."""
318
        return 'Size: {} Offset: {}'.format(len(self._data), self._offset)
319
320
    def __len__(self):
321
        """Return the amount of data in the buffer."""
322
        return len(self._data)
323
324
325
def zlib_decompress_all_frames(data):
326
    """Decompress all frames of zlib-compressed bytes.
327
328
    Repeatedly tries to decompress `data` until all data are decompressed, or decompression
329
    fails. This will skip over bytes that are not compressed with zlib.
330
331
    Parameters
332
    ----------
333
    data : bytearray or bytes
334
        Binary data compressed using zlib.
335
336
    Returns
337
    -------
338
        bytearray
339
            All decompressed bytes
340
    """
341
    frames = bytearray()
342
    data = bytes(data)
343
    while data:
344
        decomp = zlib.decompressobj()
345
        try:
346
            frames.extend(decomp.decompress(data))
347
            data = decomp.unused_data
348
        except zlib.error:
349
            frames.extend(data)
350
            break
351
    return frames
352
353
354
def bits_to_code(val):
355
    """Convert the number of bits to the proper code for unpacking."""
356
    if val == 8:
357
        return 'B'
358
    elif val == 16:
359
        return 'H'
360
    else:
361
        log.warning('Unsupported bit size: %s. Returning "B"', val)
362
        return 'B'
363
364
365
# For debugging
366
def hexdump(buf, num_bytes, offset=0, width=32):
367
    """Perform a hexudmp of the buffer.
368
369
    Returns the hexdump as a canonically-formatted string.
370
    """
371
    ind = offset
372
    end = offset + num_bytes
373
    lines = []
374
    while ind < end:
375
        chunk = buf[ind:ind + width]
376
        actual_width = len(chunk)
377
        hexfmt = '{:02X}'
378
        blocksize = 4
379
        blocks = [hexfmt * blocksize for _ in range(actual_width // blocksize)]
380
381
        # Need to get any partial lines
382
        num_left = actual_width % blocksize
383
        if num_left:
384
            blocks += [hexfmt * num_left + '--' * (blocksize - num_left)]
385
        blocks += ['--' * blocksize] * (width // blocksize - len(blocks))
386
387
        hexoutput = ' '.join(blocks)
388
        printable = tuple(chunk)
389
        lines.append('  '.join((hexoutput.format(*printable), str(ind).ljust(len(str(end))),
390
                                str(ind - offset).ljust(len(str(end))),
391
                                ''.join(chr(c) if 31 < c < 128 else '.' for c in chunk))))
392
        ind += width
393
    return '\n'.join(lines)
394
395
396
def interpolate_nans(x, y):
397
    """
398
    Interpolates nan values in y. Sorts if necessary.
399
    """
400
    x_sort_args = np.argsort(x)
401
    x = x[x_sort_args]
402
    y = y[x_sort_args]
403
    nans = np.isnan(y)
404
    y[nans]= np.interp(x[nans], x[~nans], y[~nans])
405
    return y[x_sort_args]
406