Passed
Pull Request — master (#1929)
by
unknown
02:32
created

gammapy/background/ring.py (11 issues)

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
"""Ring background estimation."""
3
from __future__ import absolute_import, division, print_function, unicode_literals
4
from itertools import product
5
import numpy as np
6
from astropy.convolution import Ring2DKernel, Tophat2DKernel
7
from astropy.coordinates import Angle
8
from ..image.utils import scale_cube
9
10
__all__ = ["AdaptiveRingBackgroundEstimator", "RingBackgroundEstimator"]
11
12
13
class AdaptiveRingBackgroundEstimator(object):
0 ignored issues
show
The variable __class__ seems to be unused.
Loading history...
14
    """Adaptive ring background algorithm.
15
16
    This algorithm extends the `RingBackgroundEstimator` method by adapting the
17
    size of the ring to achieve a minimum on / off exposure ratio (alpha) in regions
18
    where the area to estimate the background from is limited.
19
20
    Parameters
21
    ----------
22
    r_in : `~astropy.units.Quantity`
23
        Inner radius of the ring.
24
    r_out_max : `~astropy.units.Quantity`
25
        Maximal outer radius of the ring.
26
    width : `~astropy.units.Quantity`
27
        Width of the ring.
28
    stepsize : `~astropy.units.Quantity`
29
        Stepsize used for increasing the radius.
30
    threshold_alpha : float
31
        Threshold on alpha above which the adaptive ring takes action.
32
    theta : `~astropy.units.Quantity`
33
        Integration radius used for alpha computation.
34
    method : {'fixed_width', 'fixed_r_in'}
35
        Adaptive ring method.
36
37
    Examples
38
    --------
39
    Example using `AdaptiveRingBackgroundEstimator`::
40
41
        from gammapy.maps import Map
42
        from gammapy.background import AdaptiveRingBackgroundEstimator
43
44
        filename = '$GAMMAPY_EXTRA/test_datasets/unbundled/poisson_stats_image/input_all.fits.gz'
45
        images = {
46
            'counts': Map.read(filename, hdu='counts'),
47
            'exposure_on': Map.read(filename, hdu='exposure'),
48
            'exclusion': Map.read(filename, hdu='exclusion'),
49
        }
50
51
        adaptive_ring_bkg = AdaptiveRingBackgroundEstimator(
52
            r_in='0.22 deg',
53
            r_out_max='0.8 deg',
54
            width='0.1 deg',
55
        )
56
        results = adaptive_ring_bkg.run(images)
57
        results['background'].plot()
58
59
    See Also
60
    --------
61
    RingBackgroundEstimator, gammapy.detect.KernelBackgroundEstimator
62
    """
63
64
    def __init__(
0 ignored issues
show
Too many arguments (8/5)
Loading history...
65
        self,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
66
        r_in,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
67
        r_out_max,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
68
        width,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
69
        stepsize="0.02 deg",
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
70
        threshold_alpha=0.1,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
71
        theta="0.22 deg",
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
72
        method="fixed_width",
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
73
    ):
74
        stepsize = Angle(stepsize)
75
        theta = Angle(theta)
76
77
        if method not in ["fixed_width", "fixed_r_in"]:
78
            raise ValueError("Not a valid adaptive ring method.")
79
80
        self._parameters = {
81
            "r_in": Angle(r_in),
82
            "r_out_max": Angle(r_out_max),
83
            "width": Angle(width),
84
            "stepsize": Angle(stepsize),
85
            "threshold_alpha": threshold_alpha,
86
            "theta": Angle(theta),
87
            "method": method,
88
        }
89
90
    @property
91
    def parameters(self):
92
        """Parameter dict."""
93
        return self._parameters
94
95
    def kernels(self, image):
96
        """Ring kernels according to the specified method.
97
98
        Parameters
99
        ----------
100
        image : `~gammapy.maps.WcsNDMap`
101
            Map specifying the WCS information.
102
103
        Returns
104
        -------
105
        kernels : list
106
            List of `~astropy.convolution.Ring2DKernel`
107
        """
108
        p = self.parameters
109
110
        scale = image.geom.pixel_scales[0]
111
        r_in = (p["r_in"] / scale).to_value("")
112
        r_out_max = (p["r_out_max"] / scale).to_value("")
113
        width = (p["width"] / scale).to_value("")
114
        stepsize = (p["stepsize"] / scale).to_value("")
115
116
        if p["method"] == "fixed_width":
117
            r_ins = np.arange(r_in, (r_out_max - width), stepsize)
118
            widths = [width]
119
        elif p["method"] == "fixed_r_in":
120
            widths = np.arange(width, (r_out_max - r_in), stepsize)
121
            r_ins = [r_in]
122
        else:
123
            raise ValueError("Invalid method: {}".format(p["method"]))
124
125
        kernels = []
126
        for r_in, width in product(r_ins, widths):
127
            kernel = Ring2DKernel(r_in, width)
128
            kernel.normalize("peak")
129
            kernels.append(kernel)
130
131
        return kernels
132
133
    @staticmethod
134
    def _alpha_approx_cube(cubes):
135
        """Compute alpha as on_exposure / off_exposure.
136
137
        Where off_exposure < 0, alpha is set to infinity.
138
        """
139
        exposure_on = cubes["exposure_on"]
140
        exposure_off = cubes["exposure_off"]
141
        alpha_approx = np.where(exposure_off > 0, exposure_on / exposure_off, np.inf)
142
        return alpha_approx
143
144
    @staticmethod
145
    def _exposure_off_cube(exposure_on, exclusion, kernels):
146
        """Compute off exposure cube.
147
148
        The on exposure is convolved with different
149
        ring kernels and stacking the data along the third dimension.
150
        """
151
        exposure = exposure_on.data
152
        exclusion = exclusion.data
153
        return scale_cube(exposure * exclusion, kernels)
154
155
    def _exposure_on_cube(self, exposure_on, kernels):
156
        """Compute on exposure cube.
157
158
        Calculated by convolving the on exposure with a tophat
159
        of radius theta, and stacking all images along the third dimension.
160
        """
161
        scale = exposure_on.geom.pixel_scales[0].to("deg")
162
        theta = self.parameters["theta"] * scale
163
164
        tophat = Tophat2DKernel(theta.value)
165
        tophat.normalize("peak")
166
        exposure_on = exposure_on.convolve(tophat.array)
167
        exposure_on_cube = np.repeat(
168
            exposure_on.data[:, :, np.newaxis], len(kernels), axis=2
169
        )
170
        return exposure_on_cube
171
172
    @staticmethod
173
    def _off_cube(counts, exclusion, kernels):
174
        """Compute off cube.
175
176
        Calculated by convolving the raw counts with different ring kernels
177
        and stacking the data along the third dimension.
178
        """
179
        return scale_cube(counts.data * exclusion.data, kernels)
180
181
    def _reduce_cubes(self, cubes):
182
        """Compute off and off exposure map.
183
184
        Calulated by reducing the cubes. The data is
185
        iterated along the third axis (i.e. increasing ring sizes), the value
186
        with the first approximate alpha < threshold is taken.
187
        """
188
        threshold = self._parameters["threshold_alpha"]
189
190
        alpha_approx_cube = cubes["alpha_approx"]
191
        off_cube = cubes["off"]
192
        exposure_off_cube = cubes["exposure_off"]
193
194
        shape = alpha_approx_cube.shape[:2]
195
        off = np.tile(np.nan, shape)
196
        exposure_off = np.tile(np.nan, shape)
197
198
        for idx in np.arange(alpha_approx_cube.shape[-1]):
199
            mask = (alpha_approx_cube[:, :, idx] <= threshold) & np.isnan(off)
200
            off[mask] = off_cube[:, :, idx][mask]
201
            exposure_off[mask] = exposure_off_cube[:, :, idx][mask]
202
203
        return exposure_off, off
204
205
    def run(self, images):
206
        """Run adaptive ring background algorithm.
207
208
        Parameters
209
        ----------
210
        images : dict of `~gammapy.maps.WcsNDMap`
211
            Input sky maps.
212
213
        Returns
214
        -------
215
        result : dict of `~gammapy.maps.WcsNDMap`
216
            Result sky maps.
217
        """
218
        required = ["counts", "exposure_on", "exclusion"]
219
        counts, exposure_on, exclusion = [images[_] for _ in required]
220
221
        if not counts.geom.is_image:
222
            raise ValueError("Only 2D maps are supported")
223
224
        kernels = self.kernels(counts)
225
        cubes = {
226
            "exposure_on": self._exposure_on_cube(exposure_on, kernels),
227
            "exposure_off": self._exposure_off_cube(exposure_on, exclusion, kernels),
228
            "off": self._off_cube(counts, exclusion, kernels),
229
        }
230
        cubes["alpha_approx"] = self._alpha_approx_cube(cubes)
231
232
        exposure_off, off = self._reduce_cubes(cubes)
233
        alpha = exposure_on.data / exposure_off
234
235
        # set data outside fov to zero
236
        not_has_exposure = ~(exposure_on.data > 0)
237
        for data in [alpha, off, exposure_off]:
238
            data[not_has_exposure] = 0
239
240
        background = alpha * off
241
242
        return {
243
            "exposure_off": counts.copy(data=exposure_off),
244
            "off": counts.copy(data=off),
245
            "alpha": counts.copy(data=alpha),
246
            "background": counts.copy(data=background),
247
        }
248
249
250
class RingBackgroundEstimator(object):
0 ignored issues
show
The variable __class__ seems to be unused.
Loading history...
251
    """Ring background method for cartesian coordinates.
252
253
    - Step 1: apply exclusion mask
254
    - Step 2: ring-correlate
255
    - Step 3: apply psi cut
256
257
    TODO: add method to apply the psi cut
258
259
    Parameters
260
    ----------
261
    r_in : `~astropy.units.Quantity`
262
        Inner ring radius
263
    width : `~astropy.units.Quantity`
264
        Ring width.
265
    use_fft_convolution : bool
266
        Use FFT convolution?
267
268
269
    Examples
270
    --------
271
    Example using `RingBackgroundEstimator`::
272
273
        from gammapy.maps import Map
274
        from gammapy.background import RingBackgroundEstimator
275
276
        filename = '$GAMMAPY_EXTRA/test_datasets/unbundled/poisson_stats_image/input_all.fits.gz'
277
        images = {
278
            'counts': Map.read(filename, hdu='counts'),
279
            'exposure_on': Map.read(filename, hdu='exposure'),
280
            'exclusion': Map.read(filename, hdu='exclusion'),
281
        }
282
283
        ring_bkg = RingBackgroundEstimator(r_in='0.35 deg', width='0.3 deg')
284
        results = ring_bkg.run(images)
285
        results['background'].plot()
286
287
    See Also
288
    --------
289
    gammapy.detect.KernelBackgroundEstimator, AdaptiveRingBackgroundEstimator
290
    """
291
292
    def __init__(self, r_in, width, use_fft_convolution=False):
293
        self._parameters = {
294
            "r_in": Angle(r_in),
295
            "width": Angle(width),
296
            "use_fft_convolution": use_fft_convolution,
297
        }
298
299
    @property
300
    def parameters(self):
301
        """dict of parameters"""
302
        return self._parameters
303
304
    def kernel(self, image):
305
        """Ring kernel.
306
307
        Parameters
308
        ----------
309
        image : `~gammapy.maps.WcsNDMap`
310
            Input Map
311
312
        Returns
313
        -------
314
        ring : `~astropy.convolution.Ring2DKernel`
315
            Ring kernel.
316
        """
317
        p = self.parameters
318
319
        scale = image.geom.pixel_scales[0].to("deg")
320
        r_in = p["r_in"].to("deg") / scale
321
        width = p["width"].to("deg") / scale
322
323
        ring = Ring2DKernel(r_in.value, width.value)
324
        ring.normalize("peak")
325
        return ring
326
327
    def run(self, images):
328
        """Run ring background algorithm.
329
330
        Required Maps: {required}
331
332
        Parameters
333
        ----------
334
        images : dict of `~gammapy.maps.WcsNDMap`
335
            Input sky maps.
336
337
        Returns
338
        -------
339
        result : dict of `~gammapy.maps.WcsNDMap`
340
            Result sky maps
341
        """
342
        p = self.parameters
343
        required = ["counts", "exposure_on", "exclusion"]
344
345
        counts, exposure_on, exclusion = [images[_] for _ in required]
346
347
        result = {}
348
        ring = self.kernel(counts)
349
350
        counts_excluded = counts.copy(data=counts.data * exclusion.data.astype("float"))
351
        result["off"] = counts_excluded.convolve(
352
            ring.array, use_fft=p["use_fft_convolution"]
353
        )
354
355
        exposure_on_excluded = exposure_on.copy(
356
            data=exposure_on.data * exclusion.data.astype("float")
357
        )
358
        result["exposure_off"] = exposure_on_excluded.convolve(
359
            ring.array, use_fft=p["use_fft_convolution"]
360
        )
361
362
        with np.errstate(divide="ignore", invalid="ignore"):
363
            # set pixels, where ring is too small to NaN
364
            not_has_off_exposure = ~(result["exposure_off"].data > 0)
365
            result["exposure_off"].data[not_has_off_exposure] = np.nan
366
367
            not_has_exposure = ~(exposure_on.data > 0)
368
            result["off"].data[not_has_exposure] = 0
369
            result["exposure_off"].data[not_has_exposure] = 0
370
371
            result["alpha"] = exposure_on.copy(
372
                data=exposure_on.data / result["exposure_off"].data
373
            )
374
            result["alpha"].data[not_has_exposure] = 0
375
376
        result["background"] = counts.copy(
377
            data=result["alpha"].data * result["off"].data
378
        )
379
380
        return result
381
382
    def __str__(self):
383
        s = "RingBackground parameters: \n"
384
        s += "r_in : {}\n".format(self.parameters["r_in"])
385
        s += "width: {}\n".format(self.parameters["width"])
386
        return s
387
388
389
def ring_r_out(theta, r_in, area_factor):
390
    """Compute ring outer radius.
391
392
    The determining equation is:
393
        area_factor =
394
        off_area / on_area =
395
        (pi (r_out**2 - r_in**2)) / (pi * theta**2 )
396
397
    Parameters
398
    ----------
399
    theta : float
400
        On region radius
401
    r_in : float
402
        Inner ring radius
403
    area_factor : float
404
        Desired off / on area ratio
405
406
    Returns
407
    -------
408
    r_out : float
409
        Outer ring radius
410
    """
411
    return np.sqrt(area_factor * theta ** 2 + r_in ** 2)
412
413
414
def ring_area_factor(theta, r_in, r_out):
415
    """Compute ring area factor.
416
417
    Parameters
418
    ----------
419
    theta : float
420
        On region radius
421
    r_in : float
422
        Inner ring radius
423
    r_out : float
424
        Outer ring radius
425
    """
426
    return (r_out ** 2 - r_in ** 2) / theta ** 2
427
428
429
def ring_alpha(theta, r_in, r_out):
430
    """Compute ring alpha, the inverse area factor.
431
432
    Parameters
433
    ----------
434
    theta : float
435
        On region radius
436
    r_in : float
437
        Inner ring radius
438
    r_out : float
439
        Outer ring radius
440
    """
441
    return 1.0 / ring_area_factor(theta, r_in, r_out)
442