Passed
Pull Request — master (#182)
by Fernando
01:18
created

GridAggregator.crop_batch()   A

Complexity

Conditions 3

Size

Total Lines 33
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 29
nop 4
dl 0
loc 33
rs 9.184
c 0
b 0
f 0
1
import warnings
2
from typing import Tuple
3
import torch
4
import numpy as np
5
from ...torchio import TypeData, CHANNELS_DIMENSION
6
from .grid_sampler import GridSampler
7
8
9
class GridAggregator:
10
    r"""Aggregate patches for dense inference.
11
12
    This class is typically used to build a volume made of batches after
13
    inference of patches extracted by a :py:class:`~torchio.data.GridSampler`.
14
15
    Args:
16
        sampler: Instance of :py:class:`~torchio.data.GridSampler` used to
17
            extract the patches.
18
19
    .. note:: Adapted from NiftyNet. See `this NiftyNet tutorial
20
        <https://niftynet.readthedocs.io/en/dev/window_sizes.html>`_ for more
21
        information about patch based sampling.
22
    """
23
    def __init__(self, sampler: GridSampler):
24
        sample = sampler.sample
25
        self.spatial_shape = sample.spatial_shape
26
        self._output_tensor = None
27
        self.patch_overlap = sampler.patch_overlap
28
29
    def crop_batch(
30
            self,
31
            batch: torch.Tensor,
32
            locations: np.ndarray,
33
            overlap: np.ndarray,
34
            ) -> Tuple[TypeData, np.ndarray]:
35
        border = np.array(overlap) // 2  # overlap is even in grid sampler
36
        crop_locations = locations.astype(int).copy()
37
        indices_ini, indices_fin = crop_locations[:, :3], crop_locations[:, 3:]
38
        num_locations = len(crop_locations)
39
40
        # Do not crop patches at the border of the volume
41
        border_ini = np.tile(border, (num_locations, 1))
42
        border_fin = border_ini.copy()
43
        mask_border_ini = indices_ini == 0
44
        border_ini[mask_border_ini] = 0
45
        for axis, size in enumerate(self.spatial_shape):
46
            mask_border_fin = indices_fin[:, axis] == size
47
            border_fin[mask_border_fin, axis] = 0
48
        indices_ini += border_ini
49
        indices_fin -= border_fin
50
        crop_shapes = indices_fin - indices_ini
51
52
        patch_shape = batch.shape[2:]  # ignore batch and channels dim
53
        cropped_patches = []
54
        for patch, crop_shape in zip(batch, crop_shapes):
55
            diff = patch_shape - crop_shape
56
            left = (diff / 2).astype(int)
57
            i_ini, j_ini, k_ini = left
58
            i_fin, j_fin, k_fin = left + crop_shape
59
            cropped_patch = patch[:, i_ini:i_fin, j_ini:j_fin, k_ini:k_fin]
60
            cropped_patches.append(cropped_patch)
61
        return cropped_patches, crop_locations
62
63
    def initialize_output_tensor(self, batch: torch.Tensor) -> None:
64
        if self._output_tensor is not None:
65
            return
66
        num_channels = batch.shape[CHANNELS_DIMENSION]
67
        self._output_tensor = torch.zeros(
68
            num_channels,
69
            *self.spatial_shape,
70
            dtype=batch.dtype,
71
        )
72
73
    def add_batch(
74
            self,
75
            batch_tensor: torch.Tensor,
76
            locations: torch.Tensor,
77
            ) -> None:
78
        """Add batch processed by a CNN to the output prediction volume.
79
80
        Args:
81
            batch_tensor: 5D tensor, typically the output of a convolutional
82
                neural network, e.g. ``batch['image'][torchio.DATA]``.
83
            locations: 2D tensor with shape :math:`(B, 6)` representing the
84
                patch indices in the original image. They are typically
85
                extracted using ``batch[torchio.LOCATION]``.
86
        """
87
        batch = batch_tensor.cpu()
88
        locations = locations.cpu().numpy()
89
        self.initialize_output_tensor(batch)
90
        cropped_patches, crop_locations = self.crop_batch(
91
            batch,
92
            locations,
93
            self.patch_overlap,
94
        )
95
        for patch, crop_location in zip(cropped_patches, crop_locations):
96
            i_ini, j_ini, k_ini, i_fin, j_fin, k_fin = crop_location
97
            self._output_tensor[
98
                :,
99
                i_ini:i_fin,
100
                j_ini:j_fin,
101
                k_ini:k_fin] = patch
102
103
    def get_output_tensor(self) -> torch.Tensor:
104
        """Get the aggregated volume after dense inference."""
105
        if self._output_tensor.dtype == torch.int64:
106
            message = (
107
                'Medical image frameworks such as ITK do not support int64.'
108
                ' Casting to int32...'
109
            )
110
            warnings.warn(message)
111
            self._output_tensor = self._output_tensor.type(torch.int32)
112
        return self._output_tensor
113