Passed
Push — master ( 1d1675...226afd )
by Axel
02:47 queued 11s
created

test_excess_map_estimator_map_dataset_on_off_with_correlation_model()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 18
nop 1
dl 0
loc 27
rs 9.5
c 0
b 0
f 0
1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
import pytest
3
import numpy as np
4
from numpy.testing import assert_allclose
5
import astropy.units as u
6
from gammapy.datasets import MapDataset, MapDatasetOnOff
7
from gammapy.estimators import ExcessMapEstimator
8
from gammapy.maps import Map, MapAxis, WcsGeom
9
from gammapy.irf import PSFMap
10
from gammapy.modeling.models import (
11
    GaussianSpatialModel,
12
    PowerLawSpectralModel,
13
    SkyModel,
14
)
15
from gammapy.utils.testing import requires_data
16
from gammapy.estimators.utils import estimate_exposure_reco_energy
17
18
19
def image_to_cube(input_map, energy_min, energy_max):
20
    energy_min = u.Quantity(energy_min)
21
    energy_max = u.Quantity(energy_max)
22
    axis = MapAxis.from_energy_bounds(energy_min, energy_max, nbin=1)
23
    geom = input_map.geom.to_cube([axis])
24
    return Map.from_geom(geom, data=input_map.data[np.newaxis, :, :])
25
26
27
@pytest.fixture
28
def simple_dataset():
29
    axis = MapAxis.from_energy_bounds(0.1, 10, 1, unit="TeV")
30
    geom = WcsGeom.create(npix=20, binsz=0.02, axes=[axis])
31
    dataset = MapDataset.create(geom)
32
    dataset.mask_safe += np.ones(dataset.data_shape, dtype=bool)
33
    dataset.counts += 2
34
    dataset.background += 1
35
    return dataset
36
37
38
@pytest.fixture
39
def simple_dataset_on_off():
40
    axis = MapAxis.from_energy_bounds(0.1, 10, 2, unit="TeV")
41
    geom = WcsGeom.create(npix=20, binsz=0.02, axes=[axis])
42
    dataset = MapDatasetOnOff.create(geom)
43
    dataset.mask_safe += np.ones(dataset.data_shape, dtype=bool)
44
    dataset.counts += 2
45
    dataset.counts_off += 1
46
    dataset.acceptance += 1
47
    dataset.acceptance_off += 1
48
    dataset.exposure.data += 1e6
49
    dataset.psf = None
50
51
    mask_fit = Map.from_geom(geom, data=1, dtype=bool)
52
    mask_fit.data[:, :, 10] = False
53
    mask_fit.data[:, 10, :] = False
54
    dataset.mask_fit = mask_fit
55
    return dataset
56
57
58
@requires_data()
59
def test_compute_lima_image():
60
    """
61
    Test Li & Ma image against TS image for Tophat kernel
62
    """
63
    filename = "$GAMMAPY_DATA/tests/unbundled/poisson_stats_image/input_all.fits.gz"
64
    counts = Map.read(filename, hdu="counts")
65
    counts = image_to_cube(counts, "1 GeV", "100 GeV")
66
    background = Map.read(filename, hdu="background")
67
    background = image_to_cube(background, "1 GeV", "100 GeV")
68
    dataset = MapDataset(counts=counts, background=background)
69
70
    estimator = ExcessMapEstimator("0.1 deg")
71
    result_lima = estimator.run(dataset)
72
73
    assert_allclose(result_lima["sqrt_ts"].data[:, 100, 100], 30.814916, atol=1e-3)
74
    assert_allclose(result_lima["sqrt_ts"].data[:, 1, 1], 0.164, atol=1e-3)
75
76
77
@requires_data()
78
def test_compute_lima_on_off_image():
79
    """
80
    Test Li & Ma image with snippet from the H.E.S.S. survey data.
81
    """
82
    filename = "$GAMMAPY_DATA/tests/unbundled/hess/survey/hess_survey_snippet.fits.gz"
83
    n_on = Map.read(filename, hdu="ON")
84
    counts = image_to_cube(n_on, "1 TeV", "100 TeV")
85
    n_off = Map.read(filename, hdu="OFF")
86
    counts_off = image_to_cube(n_off, "1 TeV", "100 TeV")
87
    a_on = Map.read(filename, hdu="ONEXPOSURE")
88
    acceptance = image_to_cube(a_on, "1 TeV", "100 TeV")
89
    a_off = Map.read(filename, hdu="OFFEXPOSURE")
90
    acceptance_off = image_to_cube(a_off, "1 TeV", "100 TeV")
91
    dataset = MapDatasetOnOff(
92
        counts=counts,
93
        counts_off=counts_off,
94
        acceptance=acceptance,
95
        acceptance_off=acceptance_off,
96
    )
97
98
    significance = Map.read(filename, hdu="SIGNIFICANCE")
99
    significance = image_to_cube(significance, "1 TeV", "10 TeV")
100
    estimator = ExcessMapEstimator("0.1 deg", correlate_off=False)
101
    results = estimator.run(dataset)
102
103
    # Reproduce safe significance threshold from HESS software
104
    results["sqrt_ts"].data[results["npred"].data < 5] = 0
105
106
    # crop the image at the boundaries, because the reference image
107
    # is cut out from a large map, there is no way to reproduce the
108
    # result with regular boundary handling
109
    actual = results["sqrt_ts"].crop((11, 11)).data
110
    desired = significance.crop((11, 11)).data
111
112
    # Set boundary to NaN in reference image
113
    # The absolute tolerance is low because the method used here is slightly different from the one used in HGPS
114
    # n_off is convolved as well to ensure the method applies to true ON-OFF datasets
115
    assert_allclose(actual, desired, atol=0.2)
116
117
118
def test_significance_map_estimator_map_dataset(simple_dataset):
119
    simple_dataset.exposure = None
120
    estimator = ExcessMapEstimator(0.1 * u.deg, selection_optional=["all"])
121
    result = estimator.run(simple_dataset)
122
123
    assert_allclose(result["npred"].data[0, 10, 10], 162)
124
    assert_allclose(result["npred_excess"].data[0, 10, 10], 81)
125
    assert_allclose(result["sqrt_ts"].data[0, 10, 10], 7.910732, atol=1e-5)
126
127
    assert_allclose(result["npred_excess_err"].data[0, 10, 10], 12.727922, atol=1e-3)
128
    assert_allclose(result["npred_excess_errp"].data[0, 10, 10], 13.063328, atol=1e-3)
129
    assert_allclose(result["npred_excess_errn"].data[0, 10, 10], 12.396716, atol=1e-3)
130
    assert_allclose(result["npred_excess_ul"].data[0, 10, 10], 107.806275, atol=1e-3)
131
132
133
def test_significance_map_estimator_map_dataset_exposure(simple_dataset):
134
    simple_dataset.exposure += 1e10 * u.cm ** 2 * u.s
135
    axis = simple_dataset.exposure.geom.axes[0]
136
    simple_dataset.psf = PSFMap.from_gauss(axis, sigma="0.05 deg")
137
138
    model = SkyModel(
139
        PowerLawSpectralModel(amplitude="1e-9 cm-2 s-1 TeV-1"),
140
        GaussianSpatialModel(
141
            lat_0=0.0 * u.deg, lon_0=0.0 * u.deg, sigma=0.1 * u.deg, frame="icrs"
142
        ),
143
        name="sky_model",
144
    )
145
146
    simple_dataset.models = [model]
147
    simple_dataset.npred()
148
149
    estimator = ExcessMapEstimator(0.1 * u.deg, selection_optional="all")
150
    result = estimator.run(simple_dataset)
151
152
    assert_allclose(result["npred_excess"].data.sum(), 19733.602, rtol=1e-3)
153
    assert_allclose(result["sqrt_ts"].data[0, 10, 10], 4.217129, rtol=1e-3)
154
155
156 View Code Duplication
def test_excess_map_estimator_map_dataset_on_off_no_correlation(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
    simple_dataset_on_off,
158
):
159
    # Test with exposure
160
    estimator_image = ExcessMapEstimator(
161
        0.11 * u.deg, energy_edges=[0.1, 1] * u.TeV, correlate_off=False
162
    )
163
164
    result_image = estimator_image.run(simple_dataset_on_off)
165
    assert result_image["npred"].data.shape == (1, 20, 20)
166
    assert_allclose(result_image["npred"].data[0, 10, 10], 194)
167
    assert_allclose(result_image["npred_excess"].data[0, 10, 10], 97)
168
    assert_allclose(result_image["sqrt_ts"].data[0, 10, 10], 0.780125, atol=1e-3)
169
    assert_allclose(result_image["flux"].data[:, 10, 10], 9.7e-9, atol=1e-5)
170
171
172
def test_excess_map_estimator_map_dataset_on_off_with_correlation_no_exposure(
173
    simple_dataset_on_off,
174
):
175
    # First without exposure
176
    simple_dataset_on_off.exposure = None
177
    simple_dataset_on_off.mask_fit = None
178
179
    estimator = ExcessMapEstimator(
180
        0.11 * u.deg, energy_edges=[0.1, 1, 10] * u.TeV, correlate_off=True
181
    )
182
    result = estimator.run(simple_dataset_on_off)
183
184
    assert result["npred"].data.shape == (2, 20, 20)
185
    assert_allclose(result["npred"].data[:, 10, 10], 194)
186
    assert_allclose(result["npred_excess"].data[:, 10, 10], 97)
187
    assert_allclose(result["sqrt_ts"].data[:, 10, 10], 5.741116, atol=1e-5)
188
189
190 View Code Duplication
def test_excess_map_estimator_map_dataset_on_off_with_correlation(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
191
    simple_dataset_on_off,
192
):
193
    simple_dataset_on_off.exposure = None
194
    simple_dataset_on_off.mask_fit = None
195
196
    estimator_image = ExcessMapEstimator(
197
        0.11 * u.deg, energy_edges=[0.1, 1] * u.TeV, correlate_off=True
198
    )
199
200
    result_image = estimator_image.run(simple_dataset_on_off)
201
    assert result_image["npred"].data.shape == (1, 20, 20)
202
    assert_allclose(result_image["npred"].data[0, 10, 10], 194)
203
    assert_allclose(result_image["npred_excess"].data[0, 10, 10], 97)
204
    assert_allclose(result_image["sqrt_ts"].data[0, 10, 10], 5.741116, atol=1e-3)
205
    assert_allclose(result_image["flux"].data[:, 10, 10], 9.7e-9, atol=1e-5)
206
207
208
def test_excess_map_estimator_map_dataset_on_off_with_correlation_mask_fit(
209
    simple_dataset_on_off,
210
):
211
    estimator_image = ExcessMapEstimator(
212
        0.11 * u.deg, apply_mask_fit=True, correlate_off=True
213
    )
214
215
    result_image = estimator_image.run(simple_dataset_on_off)
216
    assert result_image["npred"].data.shape == (1, 20, 20)
217
218
    assert_allclose(result_image["sqrt_ts"].data[0, 10, 10], np.nan, atol=1e-3)
219
    assert_allclose(result_image["npred"].data[0, 10, 10], np.nan)
220
    assert_allclose(result_image["npred_excess"].data[0, 10, 10], np.nan)
221
    assert_allclose(result_image["sqrt_ts"].data[0, 9, 9], 7.186745, atol=1e-3)
222
    assert_allclose(result_image["npred"].data[0, 9, 9], 304)
223
    assert_allclose(result_image["npred_excess"].data[0, 9, 9], 152)
224
225
    assert result_image["flux"].unit == u.Unit("cm-2s-1")
226
    assert_allclose(result_image["flux"].data[0, 9, 9], 1.190928e-08, rtol=1e-3)
227
228
229
def test_excess_map_estimator_map_dataset_on_off_with_correlation_model(
230
    simple_dataset_on_off,
231
):
232
    model = SkyModel(
233
        PowerLawSpectralModel(amplitude="1e-9 cm-2 s-1TeV-1"),
234
        GaussianSpatialModel(
235
            lat_0=0.0 * u.deg, lon_0=0.0 * u.deg, sigma=0.1 * u.deg, frame="icrs"
236
        ),
237
        name="sky_model",
238
    )
239
240
    simple_dataset_on_off.models = [model]
241
242
    estimator_mod = ExcessMapEstimator(
243
        0.11 * u.deg, apply_mask_fit=False, correlate_off=True
244
    )
245
    result_mod = estimator_mod.run(simple_dataset_on_off)
246
    assert result_mod["npred"].data.shape == (1, 20, 20)
247
248
    assert_allclose(result_mod["sqrt_ts"].data[0, 10, 10], 8.899278, atol=1e-3)
249
250
    assert_allclose(result_mod["npred"].data[0, 10, 10], 388)
251
    assert_allclose(result_mod["npred_excess"].data[0, 10, 10], 190.68057)
252
253
    assert result_mod["flux"].unit == "cm-2s-1"
254
    assert_allclose(result_mod["flux"].data[0, 10, 10], 1.906806e-08, rtol=1e-3)
255
    assert_allclose(result_mod["flux"].data.sum(), 5.920642e-06, rtol=1e-3)
256
257
258
def test_excess_map_estimator_map_dataset_on_off_reco_exposure(
259
    simple_dataset_on_off,
260
):
261
262
    # TODO: this has never worked...
263
    model = SkyModel(
264
        PowerLawSpectralModel(amplitude="1e-9 cm-2 s-1TeV-1"),
265
        GaussianSpatialModel(
266
            lat_0=0.0 * u.deg, lon_0=0.0 * u.deg, sigma=0.1 * u.deg, frame="icrs"
267
        ),
268
        name="sky_model",
269
    )
270
271
    simple_dataset_on_off.models = [model]
272
273
    spectral_model = PowerLawSpectralModel(index=15)
274
    estimator_mod = ExcessMapEstimator(
275
        0.11 * u.deg,
276
        apply_mask_fit=False,
277
        correlate_off=True,
278
        spectral_model=spectral_model,
279
    )
280
    result_mod = estimator_mod.run(simple_dataset_on_off)
281
282
    assert result_mod["flux"].unit == "cm-2s-1"
283
    assert_allclose(result_mod["flux"].data.sum(), 5.920642e-06, rtol=1e-3)
284
285
    reco_exposure = estimate_exposure_reco_energy(
286
        simple_dataset_on_off, spectral_model=spectral_model
287
    )
288
    assert_allclose(reco_exposure.data.sum(), 7.977796e12, rtol=0.001)
289
290
291
def test_incorrect_selection():
292
    with pytest.raises(ValueError):
293
        ExcessMapEstimator(0.11 * u.deg, selection_optional=["bad"])
294
295
    with pytest.raises(ValueError):
296
        ExcessMapEstimator(0.11 * u.deg, selection_optional=["ul", "bad"])
297
298
    estimator = ExcessMapEstimator(0.11 * u.deg)
299
    with pytest.raises(ValueError):
300
        estimator.selection_optional = "bad"
301
302
303
def test_significance_map_estimator_incorrect_dataset():
304
    with pytest.raises(ValueError):
305
        ExcessMapEstimator("bad")
306