patty.make_las_header()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 5.6503
Metric Value
cc 5
dl 0
loc 54
ccs 19
cts 27
cp 0.7037
crap 5.6503
rs 8.4156

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
'''
2
Pointcloud functions for reading/writing LAS files, and functions for dealing
3
with the spatial reference system.
4
'''
5
6 1
from __future__ import print_function
7 1
import liblas
8 1
import pcl
9 1
import os
10 1
import numpy as np
11 1
import time
12 1
from patty.srs import force_srs, is_registered
13
14 1
from sklearn.decomposition import PCA
15
16
17 1
def _check_readable(filepath):
18
    """
19
    Test whether filepath is readable, raises IOError otherwise
20
    """
21 1
    with open(filepath):
22 1
        pass
23
24
25 1
def _check_writable(filepath):
26
    """
27
    Test whether filepath is writable, raises IOError otherwise
28
    """
29
    # either the path exists but is not writable, or the path does not exist
30
    # and the parent is not writable.
31 1
    filepath = os.path.abspath(filepath)
32 1
    if (os.path.exists(filepath) and (
33
            not os.path.isfile(filepath) or
34
            not os.access(filepath, os.W_OK)
35
            )) or not os.access(os.path.dirname(filepath), os.W_OK | os.X_OK):
36
        raise IOError("Cannot save to " + filepath)
37
38
39 1
def clone(pc):
40
    """
41
    Return a copy of a pointcloud, including registration metadata
42
43
    Arguments:
44
        pc: pcl.PointCloud()
45
    Returns:
46
        cp: pcl.PointCloud()
47
    """
48
49
    cp = pcl.PointCloud(np.asarray(pc))
50
    if is_registered(pc):
51
        force_srs(cp, same_as=pc)
52
53
    return cp
54
55
56 1
def load(path, format=None, load_rgb=True):
57
    """
58
    Read a pointcloud file.
59
60
    Supports LAS and CSV files, and lets PCD and PLY files be
61
    read by python-pcl.
62
63
    Arguments:
64
        path : string
65
            Filename.
66
        format : string, optional
67
            File format: "PLY", "PCD", "LAS", "CSV",
68
            or None to detect the format from the file extension.
69
        load_rgb : bool
70
            Whether RGB is loaded for PLY and PCD files. For LAS files, RGB is
71
            always read.
72
73
    Returns:
74
        pc : pcl.PointCloud
75
    """
76 1
    if format == 'las' or format is None and path.endswith('.las'):
77 1
        pc = _load_las(path)
78 1
    elif format == 'las' or format is None and path.endswith('.csv'):
79
        pc = _load_csv(path)
80
    else:
81 1
        _check_readable(path)
82 1
        pc = pcl.load(path, format=format, loadRGB=load_rgb)
83
84 1
    return pc
85
86
87 1
def save(cloud, path, format=None, binary=False, las_header=None):
88
    """Save a pointcloud to file.
89
90
    Supports LAS and CSV files, and lets PCD and PLY
91
    files be saved by python-pcl.
92
93
    Arguments:
94
        cloud : pcl.PointCloud or pcl.PointCloudXYZRGB
95
            Pointcloud to save.
96
        path : string
97
            Filename.
98
        format : string
99
            File format: "PLY", "PCD", "LAS", "CSV",
100
            or None to detect the format from the file extension.
101
        binary : boolean
102
            Whether PLY and PCD files are saved in binary format.
103
        las_header: liblas.header.Header
104
            LAS header to use. When none, a default header is created by
105
            make_las_header(). Default: None
106
    """
107 1
    if format == 'las' or format is None and path.endswith('.las'):
108 1
        _save_las(path, cloud, header=las_header)
109 1
    elif format == 'csv' or format is None and path.endswith('.csv'):
110
        _save_csv(path, cloud)
111
    else:
112 1
        _check_writable(path)
113 1
        if is_registered(cloud) and cloud.offset != np.zeros(3):
114
            cloud_array = np.asarray(cloud)
115
            cloud_array += cloud.offset
116 1
        pcl.save(cloud, path, format=format, binary=binary)
117
118
119 1
def _load_las(lasfile):
120
    """Read a LAS file
121
122
    Returns:
123
        registered pointcloudxyzrgb
124
125
    The pointcloud has color and XYZ coordinates, and the offset and precision
126
    set.
127
    """
128 1
    _check_readable(lasfile)
129
130 1
    las = None
131 1
    try:
132 1
        las = liblas.file.File(lasfile)
133 1
        lsrs = las.header.get_srs()
134 1
        lsrs = lsrs.get_wkt()
135
136 1
        n_points = las.header.get_count()
137 1
        precise_points = np.zeros((n_points, 6), dtype=np.float64)
138
139 1
        for i, point in enumerate(las):
140 1
            precise_points[i] = (point.x, point.y, point.z,
141
                                 point.color.red / 256,
142
                                 point.color.green / 256,
143
                                 point.color.blue / 256)
144
145
        # reduce the offset to decrease floating point errors
146 1
        bbox = BoundingBox(points=precise_points[:, 0:3])
147 1
        center = bbox.center
148 1
        precise_points[:, 0:3] -= center
149
150 1
        pointcloud = pcl.PointCloudXYZRGB(precise_points.astype(np.float32))
151 1
        force_srs(pointcloud, srs=lsrs, offset=center)
152
153
    finally:
154 1
        if las is not None:
155 1
            las.close()
156
157 1
    return pointcloud
158
159
160 1
def _load_csv(path, delimiter=','):
161
    """
162
    Load a set of points from a CSV file as a pointcloud
163
164
    Returns:
165
        pc : pcl.PointCloud
166
    """
167
    precise_points = np.genfromtxt(path, delimiter=delimiter, dtype=np.float64)
168
    offset = np.mean(precise_points, axis=0, dtype=np.float64)
169
    pc = pcl.PointCloud(np.array(precise_points - offset, dtype=np.float32))
170
171
    force_srs(pc, offset=offset)
172
    return pc
173
174
175 1
def _save_csv(path, pc, delimiter=', '):
176
    """
177
    Write a pointcloud to a CSV file.
178
179
    Arguments:
180
        path: string
181
            Output filename
182
        pc: pcl.PointCloud
183
            Pointcloud to save
184
        delimiter: string
185
            Field delimiter to use, see np.savetxt documentation.
186
187
    """
188
    if not hasattr(pc, 'offset'):
189
        offset = np.zeros(3)
190
    else:
191
        offset = pc.offset
192
193
    np.savetxt(path, np.asarray(pc) + offset, delimiter=delimiter)
194
195
196 1
def extract_mask(pointcloud, mask):
197
    """Extract all points in a mask into a new pointcloud.
198
199
    Arguments:
200
        pointcloud : pcl.PointCloud
201
            Input pointcloud.
202
        mask : numpy.ndarray of bool
203
            mask for which points from the pointcloud to include.
204
    Returns:
205
        pointcloud with the same registration (if any) as the original one."""
206 1
    pointcloud_new = pointcloud.extract(np.where(mask)[0])
207 1
    if is_registered(pointcloud):
208
        force_srs(pointcloud_new, same_as=pointcloud)
209 1
    return pointcloud_new
210
211
212 1
def make_las_header(pointcloud):
213
    """Make a LAS header for given pointcloud.
214
215
    If the pointcloud is registered, this is taken into account for the
216
    header metadata.
217
218
    LAS rounds the coordinates on writing; this is controlled via the
219
    'precision' attribute of the input pointcloud. By default this is
220
    0.01 in units of the projection.
221
222
    Arguments:
223
        pointcloud : pcl.PointCloud
224
            Input pointcloud.
225
    Returns:
226
        header : liblas.header.Header
227
            Header for writing the pointcloud to a LAS file.
228
    """
229 1
    schema = liblas.schema.Schema()
230 1
    schema.time = False
231 1
    schema.color = True
232
233
    # FIXME: this format version assumes color is present
234 1
    head = liblas.header.Header()
235 1
    head.schema = schema
236 1
    head.dataformat_id = 3
237 1
    head.major_version = 1
238 1
    head.minor_version = 2
239
240 1
    if is_registered(pointcloud):
241
        try:
242
            lsrs = liblas.srs.SRS()
243
            lsrs.set_wkt(pointcloud.srs.ExportToWkt())
244
            head.set_srs(lsrs)
245
        except liblas.core.LASException:
246
            pass
247
248 1
    if hasattr(pointcloud, 'offset'):
249
        head.offset = pointcloud.offset
250
    else:
251 1
        head.offset = np.zeros(3)
252
253
    # FIXME: need extra precision to reduce floating point errors. We don't
254
    # know exactly why this works. It might reduce precision on the top of
255
    # the float, but reduces an error of one bit for the last digit.
256 1
    if not hasattr(pointcloud, 'precision'):
257 1
        precision = np.array([0.01, 0.01, 0.01], dtype=np.float64)
258
    else:
259
        precision = np.array(pointcloud.precision, dtype=np.float64)
260 1
    head.scale = precision * 0.5
261
262 1
    pc_array = np.asarray(pointcloud)
263 1
    head.min = pc_array.min(axis=0) + head.offset
264 1
    head.max = pc_array.max(axis=0) + head.offset
265 1
    return head
266
267
268 1
def _save_las(lasfile, pointcloud, header=None):
269
    """Write a pointcloud to a LAS file
270
271
    Arguments:
272
        lasfile : string
273
            Filename.
274
275
        pointcloud : pcl.PointCloud
276
277
        header : liblas.header.Header, optional
278
            See :func:`make_las_header`. If not given, makes a header using
279
            that function with default settings.
280
    """
281 1
    _check_writable(lasfile)
282
283 1
    if header is None:
284 1
        header = make_las_header(pointcloud)
285
286
    # deal with color
287 1
    if len(pointcloud[0]) > 3:
288 1
        do_rgb = True
289
    else:
290 1
        do_rgb = False
291
292 1
    precise_points = np.array(pointcloud, dtype=np.float64)
293 1
    precise_points /= header.scale
294
295 1
    las = None
296 1
    try:
297 1
        las = liblas.file.File(lasfile, mode="w", header=header)
298
299 1
        for i in xrange(pointcloud.size):
300 1
            point = liblas.point.Point()
301 1
            point.x, point.y, point.z = precise_points[i]
302 1
            if do_rgb:
303 1
                red, grn, blu = pointcloud[i][3:6]
304 1
                point.color = liblas.color.Color(
305
                    red=int(red) * 256,
306
                    green=int(grn) * 256,
307
                    blue=int(blu) * 256)
308 1
            las.write(point)
309
    finally:
310 1
        if las is not None:
311 1
            las.close()
312
313
314 1
class BoundingBox(object):
315
    '''A bounding box for a sequence of points.
316
317
    Center, size and diagonal are updated when the minimum or maximum are
318
    updated.
319
320
    Constructor usage: either set points (any object that is converted to an
321
    NxD array by np.asarray, with D the number of dimensions) or a fixed min
322
    and max.
323
    '''
324
325 1
    def __init__(self, points=None, min=None, max=None):
326 1
        if min is not None and max is not None:
327
            self._min = np.asarray(min, dtype=np.float64)
328
            self._max = np.asarray(max, dtype=np.float64)
329 1
        elif points is not None:
330 1
            points_array = np.asarray(points)
331 1
            self._min = points_array.min(axis=0)
332 1
            self._max = points_array.max(axis=0)
333
        else:
334
            raise TypeError("Need to give min and max or matrix")
335
336 1
        self._reset()
337
338 1
    def __str__(self):
339
        return 'BoundingBox <%s - %s>' % (self.min, self.max)
340
341 1
    def _reset(self):
342 1
        self._center = None
343 1
        self._size = None
344
345 1
    @property
346
    def min(self):
347 1
        return self._min
348
349 1
    @min.setter
350
    def min(self, new_min):
351
        self._reset()
352
        self.min = new_min
353
354 1
    @property
355
    def max(self):
356 1
        return self._max
357
358 1
    @max.setter
359
    def max(self, new_max):
360
        self._reset()
361
        self.max = new_max
362
363 1
    @property
364
    def center(self):
365
        ''' Center point of the bounding box'''
366 1
        if self._center is None:
367 1
            self._center = (self.min + self.max) / 2.0
368 1
        return self._center
369
370 1
    @property
371
    def size(self):
372
        ''' N-dimensional size array '''
373
        if self._size is None:
374
            self._size = self.max - self.min
375
        return self._size
376
377 1
    @property
378
    def diagonal(self):
379
        ''' Length of the diagonal of the box. '''
380
        return np.linalg.norm(self.size)
381
382 1
    def contains(self, pos):
383
        ''' Whether the bounding box contains given position. '''
384
        return np.all((pos[0:3] >= self.min) & (pos[0:3] <= self.max))
385
386
387 1
def log(*args, **kwargs):
388
    """Simple logging function that prints to stdout"""
389
    print(time.strftime("[%F %H:%M:%S]", time.gmtime()), *args, **kwargs)
390
391
392 1
def measure_length(pointcloud):
393
    """Returns the length of a point cloud in its longest direction."""
394 1
    if len(pointcloud) < 2:
395 1
        return 0
396
397 1
    pca = PCA(n_components=1)
398 1
    pc_array = np.asarray(pointcloud)
399 1
    pca.fit(pc_array)
400 1
    primary_axis = np.dot(pc_array, np.transpose(pca.components_))[:, 0]
401 1
    return np.max(primary_axis) - np.min(primary_axis)
402
403
404 1
def downsample_voxel(pc, voxel_size=0.01):
405
    '''Downsample a pointcloud using a voxel grid filter.
406
    Resulting pointcloud has the same SRS and offset as the input.
407
408
    Arguments:
409
        pc         : pcl.PointCloud
410
                     Original pointcloud
411
        float      : voxel_size
412
                     Grid spacing for the voxel grid
413
    Returns:
414
        pc : pcl.PointCloud
415
             filtered pointcloud
416
    '''
417 1
    pc_filter = pc.make_voxel_grid_filter()
418 1
    pc_filter.set_leaf_size(voxel_size, voxel_size, voxel_size)
419 1
    newpc = pc_filter.filter()
420
421 1
    force_srs(newpc, same_as=pc)
422
423 1
    return newpc
424
425
426 1
def downsample_random(pc, fraction, random_seed=None):
427
    """Randomly downsample pointcloud to a fraction of its size.
428
429
    Returns a pointcloud of size fraction * len(pc), rounded to the nearest
430
    integer.  Resulting pointcloud has the same SRS and offset as the input.
431
432
    Use random_seed=k for some integer k to get reproducible results.
433
    Arguments:
434
        pc : pcl.PointCloud
435
            Input pointcloud.
436
        fraction : float
437
            Fraction of points to include.
438
        random_seed : int, optional
439
            Seed to use in random number generator.
440
441
    Returns:
442
        pcl.Pointcloud
443
    """
444 1
    if not 0 < fraction <= 1:
445 1
        raise ValueError("Expected fraction in (0,1], got %r" % fraction)
446
447 1
    rng = np.random.RandomState(random_seed)
448
449 1
    k = max(int(round(fraction * len(pc))), 1)
450 1
    sample = rng.choice(len(pc), k, replace=False)
451 1
    new_pc = pc.extract(sample)
452
453 1
    force_srs(new_pc, same_as=pc)
454
455
    return new_pc
456