hexdump()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 28
rs 7.5384
c 0
b 0
f 0
1
# Copyright (c) 2009,2016 MetPy Developers.
2
# Distributed under the terms of the BSD 3-Clause License.
3
# SPDX-License-Identifier: BSD-3-Clause
4
"""A collection of general purpose tools for reading files."""
5
6
from __future__ import print_function
7
8
import bz2
9
from collections import namedtuple
10
import gzip
11
import logging
12
from struct import Struct
13
import zlib
14
15
from ..units import UndefinedUnitError, units
16
17
log = logging.getLogger(__name__)
18
log.setLevel(logging.WARNING)
19
20
21
# This works around problems on early Python 2.7 where Struct.unpack_from() can't handle
22
# being given a bytearray; use memoryview on Python 3, since calling bytearray again isn't
23
# cheap.
24
try:
25
    bytearray_to_buff = buffer
26
except NameError:
27
    bytearray_to_buff = memoryview
28
29
30
def open_as_needed(filename):
31
    """Return a file-object given either a filename or an object.
32
33
    Handles opening with the right class based on the file extension.
34
35
    """
36
    if hasattr(filename, 'read'):
37
        return filename
38
39
    if filename.endswith('.bz2'):
40
        return bz2.BZ2File(filename, 'rb')
41
    elif filename.endswith('.gz'):
42
        return gzip.GzipFile(filename, 'rb')
43
    else:
44
        return open(filename, 'rb')
45
46
47
class UnitLinker(object):
48
    r"""Wrap a :class:`metpy.io.cdm.Variable` and handle units.
49
50
    Converts any attached unit attribute to a class:`pint.Unit`. It also handles converting
51
    data returns to be instances of class:`pint.Quantity` rather than bare (unit-less) arrays.
52
53
    """
54
55
    def __init__(self, var):
56
        r"""Construct a new :class:`UnitLinker`.
57
58
        Parameters
59
        ----------
60
        var : Variable
61
            The :class:`metpy.io.cdm.Variable` to be wrapped.
62
63
        """
64
        self._var = var
65
        try:
66
            self._unit = units(self._var.units)
67
        except (AttributeError, UndefinedUnitError):
68
            self._unit = None
69
70
    def __getitem__(self, ind):
71
        """Get data from the underlying variable and add units."""
72
        ret = self._var[ind]
73
        return ret if self._unit is None else ret * self._unit
74
75
    def __getattr__(self, item):
76
        """Forward all attribute access onto underlying variable."""
77
        return getattr(self._var, item)
78
79
    @property
80
    def units(self):
81
        """Access the units from the underlying variable as a :class:`pint.Quantity`."""
82
        return self._unit
83
84
    @units.setter
85
    def units(self, val):
86
        """Override the units on the underlying variable."""
87
        if isinstance(val, units.Unit):
88
            self._unit = val
89
        else:
90
            self._unit = units(val)
91
92
93
class NamedStruct(Struct):
94
    """Parse bytes using :class:`Struct` but provide named fields."""
95
96
    def __init__(self, info, prefmt='', tuple_name=None):
97
        """Initialize the NamedStruct."""
98
        if tuple_name is None:
99
            tuple_name = 'NamedStruct'
100
        names, fmts = zip(*info)
101
        self.converters = {}
102
        conv_off = 0
103
        for ind, i in enumerate(info):
104
            if len(i) > 2:
105
                self.converters[ind - conv_off] = i[-1]
106
            elif not i[0]:  # Skip items with no name
107
                conv_off += 1
108
        self._tuple = namedtuple(tuple_name, ' '.join(n for n in names if n))
109
        super(NamedStruct, self).__init__(prefmt + ''.join(f for f in fmts if f))
110
111
    def _create(self, items):
112
        if self.converters:
113
            items = list(items)
114
            for ind, conv in self.converters.items():
115
                items[ind] = conv(items[ind])
116
            if len(items) < len(self._tuple._fields):
117
                items.extend([None] * (len(self._tuple._fields) - len(items)))
118
        return self.make_tuple(*items)
119
120
    def make_tuple(self, *args, **kwargs):
121
        """Construct the underlying tuple from values."""
122
        return self._tuple(*args, **kwargs)
123
124
    def unpack(self, s):
125
        """Parse bytes and return a namedtuple."""
126
        return self._create(super(NamedStruct, self).unpack(s))
127
128
    def unpack_from(self, buff, offset=0):
129
        """Read bytes from a buffer and return as a namedtuple."""
130
        return self._create(super(NamedStruct, self).unpack_from(buff, offset))
131
132
    def unpack_file(self, fobj):
133
        """Unpack the next bytes from a file object."""
134
        return self.unpack(fobj.read(self.size))
135
136
137
# This works around times when we have more than 255 items and can't use
138
# NamedStruct. This is a CPython limit for arguments.
139
class DictStruct(Struct):
140
    """Parse bytes using :class:`Struct` but provide named fields using dictionary access."""
141
142
    def __init__(self, info, prefmt=''):
143
        """Initialize the DictStruct."""
144
        names, formats = zip(*info)
145
146
        # Remove empty names
147
        self._names = [n for n in names if n]
148
149
        super(DictStruct, self).__init__(prefmt + ''.join(f for f in formats if f))
150
151
    def _create(self, items):
152
        return dict(zip(self._names, items))
153
154
    def unpack(self, s):
155
        """Parse bytes and return a namedtuple."""
156
        return self._create(super(DictStruct, self).unpack(s))
157
158
    def unpack_from(self, buff, offset=0):
159
        """Unpack the next bytes from a file object."""
160
        return self._create(super(DictStruct, self).unpack_from(buff, offset))
161
162
163
class Enum(object):
164
    """Map values to specific strings."""
165
166
    def __init__(self, *args, **kwargs):
167
        """Initialize the mapping."""
168
        # Assign values for args in order starting at 0
169
        self.val_map = {ind: a for ind, a in enumerate(args)}
170
171
        # Invert the kwargs dict so that we can map from value to name
172
        self.val_map.update(zip(kwargs.values(), kwargs.keys()))
173
174
    def __call__(self, val):
175
        """Map an integer to the string representation."""
176
        return self.val_map.get(val, 'Unknown ({})'.format(val))
177
178
179
class Bits(object):
180
    """Breaks an integer into a specified number of True/False bits."""
181
182
    def __init__(self, num_bits):
183
        """Initialize the number of bits."""
184
        self._bits = range(num_bits)
185
186
    def __call__(self, val):
187
        """Convert the integer to the list of True/False values."""
188
        return [bool((val >> i) & 0x1) for i in self._bits]
189
190
191
class BitField(object):
192
    """Convert an integer to a string for each bit."""
193
194
    def __init__(self, *names):
195
        """Initialize the list of named bits."""
196
        self._names = names
197
198
    def __call__(self, val):
199
        """Return a list with a string for each True bit in the integer."""
200
        if not val:
201
            return None
202
203
        bits = []
204
        for n in self._names:
205
            if val & 0x1:
206
                bits.append(n)
207
            val >>= 1
208
            if not val:
209
                break
210
211
        # Return whole list if empty or multiple items, otherwise just single item
212
        return bits[0] if len(bits) == 1 else bits
213
214
215
class Array(object):
216
    """Use a Struct as a callable to unpack a bunch of bytes as a list."""
217
218
    def __init__(self, fmt):
219
        """Initialize the Struct unpacker."""
220
        self._struct = Struct(fmt)
221
222
    def __call__(self, buf):
223
        """Perform the actual unpacking."""
224
        return list(self._struct.unpack(buf))
225
226
227
class IOBuffer(object):
228
    """Holds bytes from a buffer to simplify parsing and random access."""
229
230
    def __init__(self, source):
231
        """Initialize the IOBuffer with the source data."""
232
        self._data = bytearray(source)
233
        self._offset = 0
234
        self.clear_marks()
235
236
    @classmethod
237
    def fromfile(cls, fobj):
238
        """Initialize the IOBuffer with the contents of the file object."""
239
        return cls(fobj.read())
240
241
    def set_mark(self):
242
        """Mark the current location and return its id so that the buffer can return later."""
243
        self._bookmarks.append(self._offset)
244
        return len(self._bookmarks) - 1
245
246
    def jump_to(self, mark, offset=0):
247
        """Jump to a previously set mark."""
248
        self._offset = self._bookmarks[mark] + offset
249
250
    def offset_from(self, mark):
251
        """Calculate the current offset relative to a marked location."""
252
        return self._offset - self._bookmarks[mark]
253
254
    def clear_marks(self):
255
        """Clear all marked locations."""
256
        self._bookmarks = []
257
258
    def splice(self, mark, newdata):
259
        """Replace the data after the marked location with the specified data."""
260
        self.jump_to(mark)
261
        self._data = self._data[:self._offset] + bytearray(newdata)
262
263
    def read_struct(self, struct_class):
264
        """Parse and return a structure from the current buffer offset."""
265
        struct = struct_class.unpack_from(bytearray_to_buff(self._data), self._offset)
266
        self.skip(struct_class.size)
267
        return struct
268
269
    def read_func(self, func, num_bytes=None):
270
        """Parse data from the current buffer offset using a function."""
271
        # only advance if func succeeds
272
        res = func(self.get_next(num_bytes))
273
        self.skip(num_bytes)
274
        return res
275
276
    def read_ascii(self, num_bytes=None):
277
        """Return the specified bytes as ascii-formatted text."""
278
        return self.read(num_bytes).decode('ascii')
279
280
    def read_binary(self, num, item_type='B'):
281
        """Parse the current buffer offset as the specified code."""
282
        if 'B' in item_type:
283
            return self.read(num)
284
285
        if item_type[0] in ('@', '=', '<', '>', '!'):
286
            order = item_type[0]
287
            item_type = item_type[1:]
288
        else:
289
            order = '@'
290
291
        return list(self.read_struct(Struct(order + '{:d}'.format(int(num)) + item_type)))
292
293
    def read_int(self, code):
294
        """Parse the current buffer offset as the specified integer code."""
295
        return self.read_struct(Struct(code))[0]
296
297
    def read(self, num_bytes=None):
298
        """Read and return the specified bytes from the buffer."""
299
        res = self.get_next(num_bytes)
300
        self.skip(len(res))
301
        return res
302
303
    def get_next(self, num_bytes=None):
304
        """Get the next bytes in the buffer without modifying the offset."""
305
        if num_bytes is None:
306
            return self._data[self._offset:]
307
        else:
308
            return self._data[self._offset:self._offset + num_bytes]
309
310
    def skip(self, num_bytes):
311
        """Jump the ahead the specified bytes in the buffer."""
312
        if num_bytes is None:
313
            self._offset = len(self._data)
314
        else:
315
            self._offset += num_bytes
316
317
    def check_remains(self, num_bytes):
318
        """Check that the number of bytes specified remains in the buffer."""
319
        return len(self._data[self._offset:]) == num_bytes
320
321
    def truncate(self, num_bytes):
322
        """Remove the specified number of bytes from the end of the buffer."""
323
        self._data = self._data[:-num_bytes]
324
325
    def at_end(self):
326
        """Return whether the buffer has reached the end of data."""
327
        return self._offset >= len(self._data)
328
329
    def __getitem__(self, item):
330
        """Return the data at the specified location."""
331
        return self._data[item]
332
333
    def __str__(self):
334
        """Return a string representation of the IOBuffer."""
335
        return 'Size: {} Offset: {}'.format(len(self._data), self._offset)
336
337
    def __len__(self):
338
        """Return the amount of data in the buffer."""
339
        return len(self._data)
340
341
342
def zlib_decompress_all_frames(data):
343
    """Decompress all frames of zlib-compressed bytes.
344
345
    Repeatedly tries to decompress `data` until all data are decompressed, or decompression
346
    fails. This will skip over bytes that are not compressed with zlib.
347
348
    Parameters
349
    ----------
350
    data : bytearray or bytes
351
        Binary data compressed using zlib.
352
353
    Returns
354
    -------
355
        bytearray
356
            All decompressed bytes
357
358
    """
359
    frames = bytearray()
360
    data = bytes(data)
361
    while data:
362
        decomp = zlib.decompressobj()
363
        try:
364
            frames.extend(decomp.decompress(data))
365
            data = decomp.unused_data
366
        except zlib.error:
367
            frames.extend(data)
368
            break
369
    return frames
370
371
372
def bits_to_code(val):
373
    """Convert the number of bits to the proper code for unpacking."""
374
    if val == 8:
375
        return 'B'
376
    elif val == 16:
377
        return 'H'
378
    else:
379
        log.warning('Unsupported bit size: %s. Returning "B"', val)
380
        return 'B'
381
382
383
# For debugging
384
def hexdump(buf, num_bytes, offset=0, width=32):
385
    """Perform a hexudmp of the buffer.
386
387
    Returns the hexdump as a canonically-formatted string.
388
    """
389
    ind = offset
390
    end = offset + num_bytes
391
    lines = []
392
    while ind < end:
393
        chunk = buf[ind:ind + width]
394
        actual_width = len(chunk)
395
        hexfmt = '{:02X}'
396
        blocksize = 4
397
        blocks = [hexfmt * blocksize for _ in range(actual_width // blocksize)]
398
399
        # Need to get any partial lines
400
        num_left = actual_width % blocksize  # noqa: S001  Fix false alarm
401
        if num_left:
402
            blocks += [hexfmt * num_left + '--' * (blocksize - num_left)]
403
        blocks += ['--' * blocksize] * (width // blocksize - len(blocks))
404
405
        hexoutput = ' '.join(blocks)
406
        printable = tuple(chunk)
407
        lines.append('  '.join((hexoutput.format(*printable), str(ind).ljust(len(str(end))),
408
                                str(ind - offset).ljust(len(str(end))),
409
                                ''.join(chr(c) if 31 < c < 128 else '.' for c in chunk))))
410
        ind += width
411
    return '\n'.join(lines)
412