gammapy.datasets.tests.test_map   F
last analyzed

Complexity

Total Complexity 82

Size/Duplication

Total Lines 1867
Duplicated Lines 2.04 %

Importance

Changes 0
Metric Value
eloc 1315
dl 38
loc 1867
rs 0.8
c 0
b 0
f 0
wmc 82

69 Functions

Rating   Name   Duplication   Size   Complexity  
A geom() 0 9 1
B test_to_spectrum_dataset() 0 52 1
A test_map_dataset_str_empty() 0 3 1
A test_info_dict() 0 34 1
A test_different_exposure_unit() 0 15 1
A test_downsample_energy() 0 22 1
A get_fermi_3fhl_gc_dataset() 0 16 1
A test_to_image_3fhl() 0 13 1
A test_downsample() 0 17 2
A test_energy_range() 0 42 1
A test_resample_energy_3fhl() 0 20 1
A test_to_image_mask_safe() 0 27 1
A test_map_dataset_str() 0 15 1
A test_fake() 0 17 1
A geom_hpx() 0 11 1
A get_psf() 0 19 1
A get_edisp() 0 11 1
A geom_etrue() 0 9 1
A geom_hpx_partial() 0 13 1
A get_exposure() 0 13 1
A geom_image() 0 6 1
A get_map_dataset() 0 41 3
A geom1() 0 10 1
A sky_model() 0 10 1
A test_npred_psf_after_edisp() 0 28 1
A images() 0 11 1
B test_stack_npred() 0 62 1
A test_npred() 0 32 2
A to_cube() 0 5 1
A test_slice_by_idx() 0 43 1
A test_map_dataset_on_off_cutout() 0 15 1
A test_map_dataset_region_geom_npred() 0 25 1
A test_map_dataset_on_off_to_spectrum_dataset_weights() 0 41 1
A test_peek() 0 6 2
B test_map_dataset_on_off_fits_io() 0 71 3
A test_datasets_io_no_model() 0 15 1
A test_create_with_migra() 0 28 1
A test_map_dataset_on_off_fake() 0 15 1
A test_map_fit_linked() 0 35 1
A test_map_fit_one_energy_bin() 0 39 1
A test_map_dataset_stack_hpx_geom() 0 17 1
A test_stack_onoff() 0 13 1
A get_map_dataset_onoff() 0 29 1
A test_map_dataset_on_off_to_image() 0 35 1
A test_map_dataset_geom() 0 14 2
A test_names() 0 32 1
A test_create_onoff() 0 23 1
A test_map_dataset_create_hpx_geom_partial() 19 19 1
A test_map_dataset_hpx_geom_npred() 0 14 1
A test_source_outside_geom_fermi() 0 13 1
A test_dataset_mixed_geom() 0 46 1
A test_plot_residual_onoff() 0 18 2
A test_downsample_onoff() 0 23 1
A test_dataset_cutout_aligned() 0 12 1
B test_stack() 0 96 1
A test_map_dataset_onoff_str() 0 4 1
A test_info_dict_on_off() 0 14 1
B test_map_dataset_fits_io() 0 82 1
A test_map_datasets_on_off_fits_io() 0 17 2
A test_stack_dataset_dataset_on_off() 0 16 1
A test_source_outside_geom() 0 18 1
A test_map_dataset_on_off_to_spectrum_dataset() 0 25 1
A test_map_dataset_create_hpx_geom() 19 19 1
A test_to_map_dataset() 0 23 1
B test_map_fit() 0 67 2
A test_create() 0 27 1
A test_stack_onoff_cutout() 0 28 1
A test_map_dataset_name() 0 6 3
A test_region_geom_io() 0 15 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like gammapy.datasets.tests.test_map often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
import json
3
import pytest
4
import numpy as np
5
from numpy.testing import assert_allclose
6
import astropy.units as u
7
from astropy.coordinates import SkyCoord
8
from astropy.table import Table
9
from regions import CircleSkyRegion
10
from gammapy.catalog import SourceCatalog3FHL
11
from gammapy.data import GTI
12
from gammapy.datasets import Datasets, MapDataset, MapDatasetOnOff
13
from gammapy.datasets.map import RAD_AXIS_DEFAULT
14
from gammapy.irf import (
15
    EDispKernelMap,
16
    EDispMap,
17
    EffectiveAreaTable2D,
18
    EnergyDependentMultiGaussPSF,
19
    EnergyDispersion2D,
20
    PSFMap,
21
)
22
from gammapy.makers.utils import make_map_exposure_true_energy, make_psf_map
23
from gammapy.maps import HpxGeom, Map, MapAxis, RegionGeom, WcsGeom, WcsNDMap
24
from gammapy.maps.io import JsonQuantityEncoder
25
from gammapy.modeling import Fit
26
from gammapy.modeling.models import (
27
    DiskSpatialModel,
28
    FoVBackgroundModel,
29
    GaussianSpatialModel,
30
    Models,
31
    PointSpatialModel,
32
    PowerLawSpectralModel,
33
    SkyModel,
34
)
35
from gammapy.utils.testing import mpl_plot_check, requires_data, requires_dependency
36
37
38
@pytest.fixture
39
def geom_hpx():
40
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
41
42
    energy_axis_true = MapAxis.from_energy_bounds(
43
        "1 TeV", "10 TeV", nbin=4, name="energy_true"
44
    )
45
46
    geom = HpxGeom.create(nside=32, axes=[axis], frame="galactic")
47
48
    return {"geom": geom, "energy_axis_true": energy_axis_true}
49
50
51
@pytest.fixture
52
def geom_hpx_partial():
53
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
54
55
    energy_axis_true = MapAxis.from_energy_bounds(
56
        "1 TeV", "10 TeV", nbin=4, name="energy_true"
57
    )
58
59
    geom = HpxGeom.create(
60
        nside=32, axes=[axis], frame="galactic", region="DISK(110.,75.,10.)"
61
    )
62
63
    return {"geom": geom, "energy_axis_true": energy_axis_true}
64
65
66
@pytest.fixture
67
def geom():
68
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=2)
69
    return WcsGeom.create(
70
        skydir=(266.40498829, -28.93617776),
71
        binsz=0.02,
72
        width=(2, 2),
73
        frame="icrs",
74
        axes=[axis],
75
    )
76
77
78
@pytest.fixture
79
def geom1():
80
    e_axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=20)
81
    t_axis = MapAxis.from_bounds(0, 10, 2, name="time", unit="s")
82
    return WcsGeom.create(
83
        skydir=(266.40498829, -28.93617776),
84
        binsz=0.02,
85
        width=(3, 2),
86
        frame="icrs",
87
        axes=[e_axis, t_axis],
88
    )
89
90
91
@pytest.fixture
92
def geom_etrue():
93
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=3, name="energy_true")
94
    return WcsGeom.create(
95
        skydir=(266.40498829, -28.93617776),
96
        binsz=0.02,
97
        width=(2, 2),
98
        frame="icrs",
99
        axes=[axis],
100
    )
101
102
103
@pytest.fixture
104
def geom_image():
105
    energy = np.logspace(-1.0, 1.0, 2)
106
    axis = MapAxis.from_edges(energy, name="energy", unit=u.TeV, interp="log")
107
    return WcsGeom.create(
108
        skydir=(0, 0), binsz=0.02, width=(2, 2), frame="galactic", axes=[axis]
109
    )
110
111
112
def get_exposure(geom_etrue):
113
    filename = (
114
        "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits"
115
    )
116
    aeff = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA")
117
118
    exposure_map = make_map_exposure_true_energy(
119
        pointing=SkyCoord(1, 0.5, unit="deg", frame="galactic"),
120
        livetime=1 * u.hr,
121
        aeff=aeff,
122
        geom=geom_etrue,
123
    )
124
    return exposure_map
125
126
127
def get_psf():
128
    filename = (
129
        "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits"
130
    )
131
    psf = EnergyDependentMultiGaussPSF.read(filename, hdu="POINT SPREAD FUNCTION")
132
133
    geom = WcsGeom.create(
134
        skydir=(0, 0),
135
        frame="galactic",
136
        binsz=2,
137
        width=(2, 2),
138
        axes=[RAD_AXIS_DEFAULT, psf.axes["energy_true"]],
139
    )
140
141
    return make_psf_map(
142
        psf=psf,
143
        pointing=SkyCoord(0, 0.5, unit="deg", frame="galactic"),
144
        geom=geom,
145
        exposure_map=Map.from_geom(geom.squash("rad"), unit="cm2 s"),
146
    )
147
148
149
@requires_data()
150
def get_edisp(geom, geom_etrue):
151
    filename = "$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz"
152
    edisp2d = EnergyDispersion2D.read(filename, hdu="EDISP")
153
    energy = geom.axes["energy"].edges
154
    energy_true = geom_etrue.axes["energy_true"].edges
155
    edisp_kernel = edisp2d.to_edisp_kernel(
156
        offset="1.2 deg", energy=energy, energy_true=energy_true
157
    )
158
    edisp = EDispKernelMap.from_edisp_kernel(edisp_kernel)
159
    return edisp
160
161
162
@pytest.fixture
163
def sky_model():
164
    spatial_model = GaussianSpatialModel(
165
        lon_0="0.2 deg", lat_0="0.1 deg", sigma="0.2 deg", frame="galactic"
166
    )
167
    spectral_model = PowerLawSpectralModel(
168
        index=3, amplitude="1e-11 cm-2 s-1 TeV-1", reference="1 TeV"
169
    )
170
    return SkyModel(
171
        spatial_model=spatial_model, spectral_model=spectral_model, name="test-model"
172
    )
173
174
175
def get_map_dataset(geom, geom_etrue, edisp="edispmap", name="test", **kwargs):
176
    """Returns a MapDataset"""
177
    # define background model
178
    background = Map.from_geom(geom)
179
    background.data += 0.2
180
181
    psf = get_psf()
182
    exposure = get_exposure(geom_etrue)
183
184
    e_reco = geom.axes["energy"]
185
    e_true = geom_etrue.axes["energy_true"]
186
187
    if edisp == "edispmap":
188
        edisp = EDispMap.from_diagonal_response(energy_axis_true=e_true)
189
        data = exposure.get_spectrum(geom.center_skydir).data
190
        edisp.exposure_map.data = np.repeat(data, 2, axis=-1)
191
    elif edisp == "edispkernelmap":
192
        edisp = EDispKernelMap.from_diagonal_response(
193
            energy_axis=e_reco, energy_axis_true=e_true
194
        )
195
        data = exposure.get_spectrum(geom.center_skydir).data
196
        edisp.exposure_map.data = np.repeat(data, 2, axis=-1)
197
    else:
198
        edisp = None
199
200
    # define fit mask
201
    center = SkyCoord("0.2 deg", "0.1 deg", frame="galactic")
202
    circle = CircleSkyRegion(center=center, radius=1 * u.deg)
203
    mask_fit = geom.region_mask([circle])
204
205
    models = FoVBackgroundModel(dataset_name=name)
206
207
    return MapDataset(
208
        models=models,
209
        exposure=exposure,
210
        background=background,
211
        psf=psf,
212
        edisp=edisp,
213
        mask_fit=mask_fit,
214
        name=name,
215
        **kwargs,
216
    )
217
218
219
def test_map_dataset_name():
220
    with pytest.raises(ValueError, match="of type '<class 'int'>"):
221
        _ = MapDataset(name=6353)
222
223
    with pytest.raises(ValueError, match="of type '<class 'list'>"):
224
        _ = MapDataset(name=[1233, "234"])
225
226
227
@requires_data()
228
def test_map_dataset_str(sky_model, geom, geom_etrue):
229
    dataset = get_map_dataset(geom, geom_etrue)
230
231
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
232
    dataset.models = [sky_model, bkg_model]
233
234
    dataset.counts = dataset.npred()
235
    dataset.mask_safe = dataset.mask_fit
236
    assert "MapDataset" in str(dataset)
237
    assert "(frozen)" in str(dataset)
238
    assert "background" in str(dataset)
239
240
    dataset.mask_safe = None
241
    assert "MapDataset" in str(dataset)
242
243
244
def test_map_dataset_str_empty():
245
    dataset = MapDataset()
246
    assert "MapDataset" in str(dataset)
247
248
249
@requires_data()
250
def test_fake(sky_model, geom, geom_etrue):
251
    """Test the fake dataset"""
252
    dataset = get_map_dataset(geom, geom_etrue)
253
254
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
255
    dataset.models = [sky_model, bkg_model]
256
257
    npred = dataset.npred()
258
    assert np.all(npred.data >= 0)  # npred must be positive
259
    dataset.counts = npred
260
    real_dataset = dataset.copy()
261
    dataset.fake(314)
262
263
    assert real_dataset.counts.data.shape == dataset.counts.data.shape
264
    assert_allclose(real_dataset.counts.data.sum(), 9525.299054, rtol=1e-5)
265
    assert_allclose(dataset.counts.data.sum(), 9711)
266
267
268
@requires_data()
269
def test_different_exposure_unit(sky_model, geom):
270
    energy_range_true = np.logspace(2, 4, 3)
271
    axis = MapAxis.from_edges(
272
        energy_range_true, name="energy_true", unit="GeV", interp="log"
273
    )
274
    geom_gev = geom.to_image().to_cube([axis])
275
    dataset = get_map_dataset(geom, geom_gev, edisp="None")
276
277
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
278
    dataset.models = [sky_model, bkg_model]
279
280
    npred = dataset.npred()
281
282
    assert_allclose(npred.data[0, 50, 50], 6.086019, rtol=1e-2)
283
284
285
@pytest.mark.parametrize(("edisp_mode"), ["edispmap", "edispkernelmap"])
286
@requires_data()
287
def test_to_spectrum_dataset(sky_model, geom, geom_etrue, edisp_mode):
288
289
    dataset_ref = get_map_dataset(geom, geom_etrue, edisp=edisp_mode)
290
291
    bkg_model = FoVBackgroundModel(dataset_name=dataset_ref.name)
292
    dataset_ref.models = [sky_model, bkg_model]
293
294
    dataset_ref.counts = dataset_ref.npred_background() * 0.0
295
    dataset_ref.counts.data[1, 50, 50] = 1
296
    dataset_ref.counts.data[1, 60, 50] = 1
297
298
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
299
    dataset_ref.gti = gti
300
    on_region = CircleSkyRegion(center=geom.center_skydir, radius=0.05 * u.deg)
301
    spectrum_dataset = dataset_ref.to_spectrum_dataset(on_region)
302
    spectrum_dataset_corrected = dataset_ref.to_spectrum_dataset(
303
        on_region, containment_correction=True
304
    )
305
    mask = np.ones_like(dataset_ref.counts, dtype="bool")
306
    mask[1, 40:60, 40:60] = False
307
    dataset_ref.mask_safe = Map.from_geom(dataset_ref.counts.geom, data=mask)
308
    spectrum_dataset_mask = dataset_ref.to_spectrum_dataset(on_region)
309
310
    assert np.sum(spectrum_dataset.counts.data) == 1
311
    assert spectrum_dataset.data_shape == (2, 1, 1)
312
    assert spectrum_dataset.background.geom.axes[0].nbin == 2
313
    assert spectrum_dataset.exposure.geom.axes[0].nbin == 3
314
    assert spectrum_dataset.exposure.unit == "m2s"
315
316
    energy_axis = geom.axes["energy"]
317
    assert (
318
        spectrum_dataset.edisp.get_edisp_kernel(energy_axis=energy_axis)
319
        .axes["energy"]
320
        .nbin
321
        == 2
322
    )
323
    assert (
324
        spectrum_dataset.edisp.get_edisp_kernel(energy_axis=energy_axis)
325
        .axes["energy_true"]
326
        .nbin
327
        == 3
328
    )
329
330
    assert_allclose(spectrum_dataset.edisp.exposure_map.data[1], 3.069227e09, rtol=1e-5)
331
    assert np.sum(spectrum_dataset_mask.counts.data) == 0
332
    assert spectrum_dataset_mask.data_shape == (2, 1, 1)
333
    assert spectrum_dataset_corrected.exposure.unit == "m2s"
334
335
    assert_allclose(spectrum_dataset.exposure.data[1], 3.070884e09, rtol=1e-5)
336
    assert_allclose(spectrum_dataset_corrected.exposure.data[1], 2.05201e09, rtol=1e-5)
337
338
339
@requires_data()
340
def test_energy_range(sky_model, geom1, geom_etrue):
341
    sky_coord1 = SkyCoord(266.5, -29.3, unit="deg")
342
    region1 = CircleSkyRegion(sky_coord1, 0.5 * u.deg)
343
    mask1 = geom1.region_mask([region1]) & geom1.energy_mask(1 * u.TeV, 7 * u.TeV)
344
    sky_coord2 = SkyCoord(266.5, -28.7, unit="deg")
345
    region2 = CircleSkyRegion(sky_coord2, 0.5 * u.deg)
346
    mask2 = geom1.region_mask([region2]) & geom1.energy_mask(2 * u.TeV, 8 * u.TeV)
347
    mask3 = geom1.energy_mask(3 * u.TeV, 6 * u.TeV)
348
349
    mask_safe = Map.from_geom(geom1, data=(mask1 | mask2 | mask3).data)
350
    dataset = get_map_dataset(geom1, geom_etrue, edisp=None, mask_safe=mask_safe)
351
    energy = geom1.axes["energy"].edges.value
352
353
    e_min, e_max = dataset.energy_range_safe
354
    assert_allclose(e_min.get_by_coord((265, -28, 0)), energy[15])
355
    assert_allclose(e_max.get_by_coord((265, -28, 5)), energy[17])
356
    assert_allclose(e_min.get_by_coord((sky_coord1.ra, sky_coord1.dec, 6)), energy[10])
357
    assert_allclose(e_max.get_by_coord((sky_coord1.ra, sky_coord1.dec, 1)), energy[18])
358
    assert_allclose(e_min.get_by_coord((sky_coord2.ra, sky_coord2.dec, 2)), energy[14])
359
    assert_allclose(e_max.get_by_coord((sky_coord2.ra, sky_coord2.dec, 7)), energy[19])
360
    assert_allclose(e_min.get_by_coord((266.5, -29, 8)), energy[10])
361
    assert_allclose(e_max.get_by_coord((266.5, -29, 3)), energy[19])
362
363
    e_min, e_max = dataset.energy_range_fit
364
    assert_allclose(e_min.get_by_coord((265, -28, 0)), np.nan)
365
    assert_allclose(e_max.get_by_coord((265, -28, 5)), np.nan)
366
    assert_allclose(e_min.get_by_coord((266, -29, 4)), energy[0])
367
    assert_allclose(e_max.get_by_coord((266, -29, 9)), energy[20])
368
369
    e_min, e_max = dataset.energy_range
370
    assert_allclose(e_min.get_by_coord((266, -29, 4)), energy[15])
371
    assert_allclose(e_max.get_by_coord((266, -29, 9)), energy[17])
372
373
    mask_zeros = Map.from_geom(geom1, data=np.zeros_like(mask_safe))
374
    e_min, e_max = dataset._energy_range(mask_zeros)
375
    assert_allclose(e_min.get_by_coord((266.5, -29, 8)), np.nan)
376
    assert_allclose(e_max.get_by_coord((266.5, -29, 3)), np.nan)
377
378
    e_min, e_max = dataset._energy_range()
379
    assert_allclose(e_min.get_by_coord((265, -28, 0)), energy[0])
380
    assert_allclose(e_max.get_by_coord((265, -28, 5)), energy[20])
381
382
383
@requires_data()
384
def test_info_dict(sky_model, geom, geom_etrue):
385
    dataset = get_map_dataset(geom, geom_etrue)
386
387
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
388
    dataset.models = [sky_model, bkg_model]
389
390
    dataset.counts = dataset.npred()
391
    info_dict = dataset.info_dict()
392
393
    assert_allclose(info_dict["counts"], 9526, rtol=1e-3)
394
    assert_allclose(info_dict["background"], 4000.0005, rtol=1e-3)
395
    assert_allclose(info_dict["npred_background"], 4000.0, rtol=1e-3)
396
    assert_allclose(info_dict["excess"], 5525.756, rtol=1e-3)
397
    assert_allclose(info_dict["exposure_min"].value, 8.32e8, rtol=1e-3)
398
    assert_allclose(info_dict["exposure_max"].value, 1.105e10, rtol=1e-3)
399
    assert info_dict["exposure_max"].unit == "m2 s"
400
    assert info_dict["name"] == "test"
401
402
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
403
    dataset.gti = gti
404
    info_dict = dataset.info_dict()
405
    assert_allclose(info_dict["counts"], 9526, rtol=1e-3)
406
    assert_allclose(info_dict["background"], 4000.0005, rtol=1e-3)
407
    assert_allclose(info_dict["npred_background"], 4000.0, rtol=1e-3)
408
    assert_allclose(info_dict["sqrt_ts"], 74.024180, rtol=1e-3)
409
    assert_allclose(info_dict["excess"], 5525.756, rtol=1e-3)
410
    assert_allclose(info_dict["ontime"].value, 3600)
411
412
    assert info_dict["name"] == "test"
413
414
    # try to dump as json
415
    result = json.dumps(info_dict, cls=JsonQuantityEncoder)
416
    assert "counts" in result
417
418
419
def get_fermi_3fhl_gc_dataset():
420
    counts = Map.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-counts-cube.fits.gz")
421
    background = Map.read(
422
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-background-cube.fits.gz"
423
    )
424
    bkg_model = FoVBackgroundModel(dataset_name="fermi-3fhl-gc")
425
426
    exposure = Map.read(
427
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-exposure-cube.fits.gz"
428
    )
429
    return MapDataset(
430
        counts=counts,
431
        background=background,
432
        models=[bkg_model],
433
        exposure=exposure,
434
        name="fermi-3fhl-gc",
435
    )
436
437
438
@requires_data()
439
def test_resample_energy_3fhl():
440
    dataset = get_fermi_3fhl_gc_dataset()
441
442
    new_axis = MapAxis.from_edges([10, 35, 100] * u.GeV, interp="log", name="energy")
443
    grouped = dataset.resample_energy_axis(energy_axis=new_axis)
444
445
    assert grouped.counts.data.shape == (2, 200, 400)
446
    assert grouped.counts.data[0].sum() == 28581
447
    assert_allclose(
448
        grouped.npred_background().data.sum(axis=(1, 2)),
449
        [25074.366386, 2194.298612],
450
        rtol=1e-5,
451
    )
452
    assert_allclose(grouped.exposure.data, dataset.exposure.data, rtol=1e-5)
453
454
    axis = grouped.counts.geom.axes[0]
455
    npred = dataset.npred()
456
    npred_grouped = grouped.npred()
457
    assert_allclose(npred.resample_axis(axis=axis).data.sum(), npred_grouped.data.sum())
458
459
460
@requires_data()
461
def test_to_image_3fhl():
462
    dataset = get_fermi_3fhl_gc_dataset()
463
464
    dataset_im = dataset.to_image()
465
466
    assert dataset_im.counts.data.sum() == dataset.counts.data.sum()
467
    assert_allclose(dataset_im.npred_background().data.sum(), 28548.625, rtol=1e-5)
468
    assert_allclose(dataset_im.exposure.data, dataset.exposure.data, rtol=1e-5)
469
470
    npred = dataset.npred()
471
    npred_im = dataset_im.npred()
472
    assert_allclose(npred.data.sum(), npred_im.data.sum())
473
474
475
def test_to_image_mask_safe():
476
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=2)
477
    geom = WcsGeom.create(
478
        skydir=(0, 0), binsz=0.5, width=(1, 1), frame="icrs", axes=[axis]
479
    )
480
    dataset = MapDataset.create(geom)
481
482
    # Check map_safe handling
483
    data = np.array([[[False, True], [True, True]], [[False, False], [True, True]]])
484
    dataset.mask_safe = WcsNDMap.from_geom(geom=geom, data=data)
485
486
    dataset_im = dataset.to_image()
487
    assert dataset_im.mask_safe.data.dtype == bool
488
489
    desired = np.array([[False, True], [True, True]])
490
    assert (dataset_im.mask_safe.data == desired).all()
491
492
    # Check that missing entries in the dataset do not break
493
    dataset_copy = dataset.copy()
494
    dataset_copy.exposure = None
495
    dataset_im = dataset_copy.to_image()
496
    assert dataset_im.exposure is None
497
498
    dataset_copy = dataset.copy()
499
    dataset_copy.counts = None
500
    dataset_im = dataset_copy.to_image()
501
    assert dataset_im.counts is None
502
503
504
@requires_data()
505
def test_downsample():
506
    dataset = get_fermi_3fhl_gc_dataset()
507
508
    downsampled = dataset.downsample(2)
509
510
    assert downsampled.counts.data.shape == (11, 100, 200)
511
    assert downsampled.counts.data.sum() == dataset.counts.data.sum()
512
    assert_allclose(
513
        downsampled.npred_background().data.sum(axis=(1, 2)),
514
        dataset.npred_background().data.sum(axis=(1, 2)),
515
        rtol=1e-5,
516
    )
517
    assert_allclose(downsampled.exposure.data[5, 50, 100], 3.318082e11, rtol=1e-5)
518
519
    with pytest.raises(ValueError):
520
        dataset.downsample(2, axis_name="energy")
521
522
523
def test_downsample_energy(geom, geom_etrue):
524
    # This checks that downsample and resample_energy_axis give identical results
525
    counts = Map.from_geom(geom, dtype="int")
526
    counts += 1
527
    mask = Map.from_geom(geom, dtype="bool")
528
    mask.data[1:] = True
529
    counts += 1
530
    exposure = Map.from_geom(geom_etrue, unit="m2s")
531
    edisp = EDispKernelMap.from_gauss(geom.axes[0], geom_etrue.axes[0], 0.1, 0.0)
532
    dataset = MapDataset(
533
        counts=counts,
534
        exposure=exposure,
535
        mask_safe=mask,
536
        edisp=edisp,
537
    )
538
    dataset_downsampled = dataset.downsample(2, axis_name="energy")
539
    dataset_resampled = dataset.resample_energy_axis(geom.axes[0].downsample(2))
540
541
    assert dataset_downsampled.edisp.edisp_map.data.shape == (3, 1, 1, 2)
542
    assert_allclose(
543
        dataset_downsampled.edisp.edisp_map.data[:, :, 0, 0],
544
        dataset_resampled.edisp.edisp_map.data[:, :, 0, 0],
545
    )
546
547
548
@requires_data()
549
def test_map_dataset_fits_io(tmp_path, sky_model, geom, geom_etrue):
550
    dataset = get_map_dataset(geom, geom_etrue)
551
552
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
553
    dataset.models = [sky_model, bkg_model]
554
555
    dataset.counts = dataset.npred()
556
    dataset.mask_safe = dataset.mask_fit
557
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
558
    dataset.gti = gti
559
560
    hdulist = dataset.to_hdulist()
561
562
    actual = [hdu.name for hdu in hdulist]
563
564
    desired = [
565
        "PRIMARY",
566
        "COUNTS",
567
        "COUNTS_BANDS",
568
        "EXPOSURE",
569
        "EXPOSURE_BANDS",
570
        "BACKGROUND",
571
        "BACKGROUND_BANDS",
572
        "EDISP",
573
        "EDISP_BANDS",
574
        "EDISP_EXPOSURE",
575
        "EDISP_EXPOSURE_BANDS",
576
        "PSF",
577
        "PSF_BANDS",
578
        "PSF_EXPOSURE",
579
        "PSF_EXPOSURE_BANDS",
580
        "MASK_SAFE",
581
        "MASK_SAFE_BANDS",
582
        "MASK_FIT",
583
        "MASK_FIT_BANDS",
584
        "GTI",
585
    ]
586
587
    assert actual == desired
588
589
    dataset.write(tmp_path / "test.fits")
590
591
    dataset_new = MapDataset.read(tmp_path / "test.fits")
592
593
    assert dataset_new.name == "test"
594
595
    assert dataset_new.mask.data.dtype == bool
596
597
    assert_allclose(dataset.counts.data, dataset_new.counts.data)
598
    assert_allclose(
599
        dataset.npred_background().data, dataset_new.npred_background().data
600
    )
601
    assert_allclose(dataset.edisp.edisp_map.data, dataset_new.edisp.edisp_map.data)
602
    assert_allclose(dataset.psf.psf_map.data, dataset_new.psf.psf_map.data)
603
    assert_allclose(dataset.exposure.data, dataset_new.exposure.data)
604
    assert_allclose(dataset.mask_fit.data, dataset_new.mask_fit.data)
605
    assert_allclose(dataset.mask_safe.data, dataset_new.mask_safe.data)
606
607
    assert dataset.counts.geom == dataset_new.counts.geom
608
    assert dataset.exposure.geom == dataset_new.exposure.geom
609
610
    assert_allclose(dataset.exposure.meta["livetime"], 1 * u.h)
611
    assert dataset.npred_background().geom == dataset_new.npred_background().geom
612
    assert dataset.edisp.edisp_map.geom == dataset_new.edisp.edisp_map.geom
613
614
    assert_allclose(
615
        dataset.gti.time_sum.to_value("s"), dataset_new.gti.time_sum.to_value("s")
616
    )
617
618
    # To test io of psf and edisp map
619
    stacked = MapDataset.create(geom)
620
    stacked.write(tmp_path / "test-2.fits", overwrite=True)
621
    stacked1 = MapDataset.read(tmp_path / "test-2.fits")
622
    assert stacked1.psf.psf_map is not None
623
    assert stacked1.psf.exposure_map is not None
624
    assert stacked1.edisp.edisp_map is not None
625
    assert stacked1.edisp.exposure_map is not None
626
    assert stacked.mask.data.dtype == bool
627
628
    assert_allclose(stacked1.psf.psf_map, stacked.psf.psf_map)
629
    assert_allclose(stacked1.edisp.edisp_map, stacked.edisp.edisp_map)
630
631
632
@requires_data()
633
def test_map_fit(sky_model, geom, geom_etrue):
634
    dataset_1 = get_map_dataset(geom, geom_etrue, name="test-1")
635
    dataset_2 = get_map_dataset(geom, geom_etrue, name="test-2")
636
    datasets = Datasets([dataset_1, dataset_2])
637
638
    models = Models(datasets.models)
639
    models.insert(0, sky_model)
640
641
    models["test-1-bkg"].spectral_model.norm.value = 0.5
642
    models["test-model"].spatial_model.sigma.frozen = True
643
644
    datasets.models = models
645
    dataset_2.counts = dataset_2.npred()
646
    dataset_1.counts = dataset_1.npred()
647
648
    models["test-1-bkg"].spectral_model.norm.value = 0.49
649
    models["test-2-bkg"].spectral_model.norm.value = 0.99
650
651
    fit = Fit()
652
    result = fit.run(datasets=datasets)
653
654
    assert result.success
655
    assert "minuit" in repr(result)
656
657
    npred = dataset_1.npred().data.sum()
658
    assert_allclose(npred, 7525.790688, rtol=1e-3)
659
    assert_allclose(result.total_stat, 21625.845714, rtol=1e-3)
660
661
    pars = models.parameters
662
    assert_allclose(pars["lon_0"].value, 0.2, rtol=1e-2)
663
    assert_allclose(pars["lon_0"].error, 0.002244, rtol=1e-2)
664
665
    assert_allclose(pars["index"].value, 3, rtol=1e-2)
666
    assert_allclose(pars["index"].error, 0.0242, rtol=1e-2)
667
668
    assert_allclose(pars["amplitude"].value, 1e-11, rtol=1e-2)
669
    assert_allclose(pars["amplitude"].error, 4.216e-13, rtol=1e-2)
670
671
    # background norm 1
672
    assert_allclose(pars[8].value, 0.5, rtol=1e-2)
673
    assert_allclose(pars[8].error, 0.015811, rtol=1e-2)
674
675
    # background norm 2
676
    assert_allclose(pars[11].value, 1, rtol=1e-2)
677
    assert_allclose(pars[11].error, 0.02147, rtol=1e-2)
678
679
    # test mask_safe evaluation
680
    dataset_1.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
681
    dataset_2.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
682
683
    stat = datasets.stat_sum()
684
    assert_allclose(stat, 14823.772744, rtol=1e-5)
685
686
    region = sky_model.spatial_model.to_region()
687
688
    initial_counts = dataset_1.counts.copy()
689
    with mpl_plot_check():
690
        dataset_1.plot_residuals(kwargs_spectral=dict(region=region))
691
692
    # check dataset has not changed
693
    assert initial_counts == dataset_1.counts
694
695
    # test model evaluation outside image
696
    dataset_1.models[0].spatial_model.lon_0.value = 150
697
    dataset_1.npred()
698
    assert not dataset_1.evaluators["test-model"].contributes
699
700
701
@requires_data()
702
def test_map_fit_linked(sky_model, geom, geom_etrue):
703
    dataset_1 = get_map_dataset(geom, geom_etrue, name="test-1")
704
    dataset_2 = get_map_dataset(geom, geom_etrue, name="test-2")
705
    datasets = Datasets([dataset_1, dataset_2])
706
707
    models = Models(datasets.models)
708
    models.insert(0, sky_model)
709
    sky_model2 = sky_model.copy(name="test-model-2")
710
    sky_model2.spectral_model.index = sky_model.spectral_model.index
711
    sky_model2.spectral_model.reference = sky_model.spectral_model.reference
712
713
    models.insert(0, sky_model2)
714
715
    models["test-1-bkg"].spectral_model.norm.value = 0.5
716
    models["test-model"].spatial_model.sigma.frozen = True
717
718
    datasets.models = models
719
    dataset_2.counts = dataset_2.npred()
720
    dataset_1.counts = dataset_1.npred()
721
722
    models["test-1-bkg"].spectral_model.norm.value = 0.49
723
    models["test-2-bkg"].spectral_model.norm.value = 0.99
724
725
    fit = Fit()
726
    result = fit.run(datasets=datasets)
727
728
    assert result.success
729
    assert "minuit" in repr(result)
730
731
    assert sky_model2.parameters["index"] is sky_model.parameters["index"]
732
    assert sky_model2.parameters["reference"] is sky_model.parameters["reference"]
733
734
    assert len(datasets.models.parameters.unique_parameters) == 20
735
    assert datasets.models.covariance.shape == (22, 22)
736
737
738
@requires_data()
739
def test_map_fit_one_energy_bin(sky_model, geom_image):
740
    energy_axis = geom_image.axes["energy"]
741
    geom_etrue = geom_image.to_image().to_cube([energy_axis.copy(name="energy_true")])
742
743
    dataset = get_map_dataset(geom_image, geom_etrue)
744
745
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
746
    dataset.models = [sky_model, bkg_model]
747
748
    sky_model.spectral_model.index.value = 3.0
749
    sky_model.spectral_model.index.frozen = True
750
    dataset.models[f"{dataset.name}-bkg"].spectral_model.norm.value = 0.5
751
752
    dataset.counts = dataset.npred()
753
754
    # Move a bit away from the best-fit point, to make sure the optimiser runs
755
    sky_model.parameters["sigma"].value = 0.21
756
    dataset.models[f"{dataset.name}-bkg"].parameters["norm"].frozen = True
757
758
    fit = Fit()
759
    result = fit.run(datasets=[dataset])
760
761
    assert result.success
762
763
    npred = dataset.npred().data.sum()
764
    assert_allclose(npred, 16538.124036, rtol=1e-3)
765
    assert_allclose(result.total_stat, -34844.125047, rtol=1e-3)
766
767
    pars = sky_model.parameters
768
769
    assert_allclose(pars["lon_0"].value, 0.2, rtol=1e-2)
770
    assert_allclose(pars["lon_0"].error, 0.001689, rtol=1e-2)
771
772
    assert_allclose(pars["sigma"].value, 0.2, rtol=1e-2)
773
    assert_allclose(pars["sigma"].error, 0.00092, rtol=1e-2)
774
775
    assert_allclose(pars["amplitude"].value, 1e-11, rtol=1e-2)
776
    assert_allclose(pars["amplitude"].error, 8.127593e-14, rtol=1e-2)
777
778
779
def test_create():
780
    # tests empty datasets created
781
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
782
    e_reco = MapAxis.from_edges(
783
        np.logspace(-1.0, 1.0, 3), name="energy", unit=u.TeV, interp="log"
784
    )
785
    e_true = MapAxis.from_edges(
786
        np.logspace(-1.0, 1.0, 4), name="energy_true", unit=u.TeV, interp="log"
787
    )
788
    geom = WcsGeom.create(binsz=0.02, width=(2, 2), axes=[e_reco])
789
    empty_dataset = MapDataset.create(
790
        geom=geom, energy_axis_true=e_true, rad_axis=rad_axis
791
    )
792
793
    assert empty_dataset.counts.data.shape == (2, 100, 100)
794
795
    assert empty_dataset.exposure.data.shape == (3, 100, 100)
796
797
    assert empty_dataset.psf.psf_map.data.shape == (3, 50, 10, 10)
798
    assert empty_dataset.psf.exposure_map.data.shape == (3, 1, 10, 10)
799
800
    assert isinstance(empty_dataset.edisp, EDispKernelMap)
801
    assert empty_dataset.edisp.edisp_map.data.shape == (3, 2, 10, 10)
802
    assert empty_dataset.edisp.exposure_map.data.shape == (3, 1, 10, 10)
803
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 300)
804
805
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
806
807
808
def test_create_with_migra(tmp_path):
809
    # tests empty datasets created
810
    migra_axis = MapAxis(nodes=np.linspace(0.0, 3.0, 51), unit="", name="migra")
811
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
812
    e_reco = MapAxis.from_edges(
813
        np.logspace(-1.0, 1.0, 3), name="energy", unit=u.TeV, interp="log"
814
    )
815
    e_true = MapAxis.from_edges(
816
        np.logspace(-1.0, 1.0, 4), name="energy_true", unit=u.TeV, interp="log"
817
    )
818
    geom = WcsGeom.create(binsz=0.02, width=(2, 2), axes=[e_reco])
819
    empty_dataset = MapDataset.create(
820
        geom=geom, energy_axis_true=e_true, migra_axis=migra_axis, rad_axis=rad_axis
821
    )
822
823
    empty_dataset.write(tmp_path / "test.fits")
824
825
    dataset_new = MapDataset.read(tmp_path / "test.fits")
826
827
    assert isinstance(empty_dataset.edisp, EDispMap)
828
    assert empty_dataset.edisp.edisp_map.data.shape == (3, 50, 10, 10)
829
    assert empty_dataset.edisp.exposure_map.data.shape == (3, 1, 10, 10)
830
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 5000)
831
832
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
833
834
    assert isinstance(dataset_new.edisp, EDispMap)
835
    assert dataset_new.edisp.edisp_map.data.shape == (3, 50, 10, 10)
836
837
838
def test_stack(sky_model):
839
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=3)
840
    geom = WcsGeom.create(
841
        skydir=(266.40498829, -28.93617776),
842
        binsz=0.05,
843
        width=(2, 2),
844
        frame="icrs",
845
        axes=[axis],
846
    )
847
    axis_etrue = MapAxis.from_energy_bounds(
848
        "0.1 TeV", "10 TeV", nbin=5, name="energy_true"
849
    )
850
    geom_etrue = WcsGeom.create(
851
        skydir=(266.40498829, -28.93617776),
852
        binsz=0.05,
853
        width=(2, 2),
854
        frame="icrs",
855
        axes=[axis_etrue],
856
    )
857
858
    edisp = EDispKernelMap.from_diagonal_response(
859
        energy_axis=axis, energy_axis_true=axis_etrue, geom=geom
860
    )
861
    edisp.exposure_map.quantity = (
862
        1e0 * u.m**2 * u.s * np.ones(edisp.exposure_map.data.shape)
863
    )
864
865
    bkg1 = Map.from_geom(geom)
866
    bkg1.data += 0.2
867
868
    cnt1 = Map.from_geom(geom)
869
    cnt1.data = 1.0 * np.ones(cnt1.data.shape)
870
871
    exp1 = Map.from_geom(geom_etrue)
872
    exp1.quantity = 1e7 * u.m**2 * u.s * np.ones(exp1.data.shape)
873
874
    mask1 = Map.from_geom(geom)
875
    mask1.data = np.ones(mask1.data.shape, dtype=bool)
876
    mask1.data[0][:][5:10] = False
877
    dataset1 = MapDataset(
878
        counts=cnt1,
879
        background=bkg1,
880
        exposure=exp1,
881
        mask_safe=mask1,
882
        mask_fit=mask1,
883
        name="dataset-1",
884
        edisp=edisp,
885
        meta_table=Table({"OBS_ID": [0]}),
886
    )
887
888
    bkg2 = Map.from_geom(geom)
889
    bkg2.data = 0.1 * np.ones(bkg2.data.shape)
890
891
    cnt2 = Map.from_geom(geom)
892
    cnt2.data = 1.0 * np.ones(cnt2.data.shape)
893
894
    exp2 = Map.from_geom(geom_etrue)
895
    exp2.quantity = 1e7 * u.m**2 * u.s * np.ones(exp2.data.shape)
896
897
    mask2 = Map.from_geom(geom)
898
    mask2.data = np.ones(mask2.data.shape, dtype=bool)
899
    mask2.data[0][:][5:10] = False
900
    mask2.data[1][:][10:15] = False
901
902
    dataset2 = MapDataset(
903
        counts=cnt2,
904
        background=bkg2,
905
        exposure=exp2,
906
        mask_safe=mask2,
907
        mask_fit=mask2,
908
        name="dataset-2",
909
        edisp=edisp,
910
        meta_table=Table({"OBS_ID": [1]}),
911
    )
912
913
    background_model2 = FoVBackgroundModel(dataset_name="dataset-2")
914
    background_model1 = FoVBackgroundModel(dataset_name="dataset-1")
915
916
    dataset1.models = [background_model1, sky_model]
917
    dataset2.models = [background_model2, sky_model]
918
919
    stacked = MapDataset.from_geoms(**dataset1.geoms)
920
    stacked.stack(dataset1)
921
    stacked.stack(dataset2)
922
923
    stacked.models = [sky_model]
924
    npred_b = stacked.npred()
925
926
    assert_allclose(npred_b.data.sum(), 1459.985035, 1e-5)
927
    assert_allclose(stacked.npred_background().data.sum(), 1360.00, 1e-5)
928
    assert_allclose(stacked.counts.data.sum(), 9000, 1e-5)
929
    assert_allclose(stacked.mask_safe.data.sum(), 4600)
930
    assert_allclose(stacked.mask_fit.data.sum(), 4600)
931
    assert_allclose(stacked.exposure.data.sum(), 1.6e11)
932
933
    assert_allclose(stacked.meta_table["OBS_ID"][0], [0, 1])
934
935
936
@requires_data()
937
def test_npred(sky_model, geom, geom_etrue):
938
    dataset = get_map_dataset(geom, geom_etrue)
939
940
    pwl = PowerLawSpectralModel()
941
    gauss = GaussianSpatialModel(
942
        lon_0="0.0 deg", lat_0="0.0 deg", sigma="0.5 deg", frame="galactic"
943
    )
944
    model1 = SkyModel(pwl, gauss, name="m1")
945
946
    bkg = FoVBackgroundModel(dataset_name=dataset.name)
947
    dataset.models = [bkg, sky_model, model1]
948
949
    assert_allclose(
950
        dataset.npred_signal(model_name=model1.name).data.sum(), 150.7487, rtol=1e-3
951
    )
952
    assert dataset._background_cached is None
953
    assert_allclose(dataset.npred_background().data.sum(), 4000.0, rtol=1e-3)
954
    assert_allclose(dataset._background_cached.data.sum(), 4000.0, rtol=1e-3)
955
956
    assert_allclose(dataset.npred().data.sum(), 9676.047906, rtol=1e-3)
957
    assert_allclose(dataset.npred_signal().data.sum(), 5676.04790, rtol=1e-3)
958
959
    bkg.spectral_model.norm.value = 1.1
960
    assert_allclose(dataset.npred_background().data.sum(), 4400.0, rtol=1e-3)
961
    assert_allclose(dataset._background_cached.data.sum(), 4400.0, rtol=1e-3)
962
963
    with pytest.raises(
964
        KeyError,
965
        match="m2",
966
    ):
967
        dataset.npred_signal(model_name="m2")
968
969
970
def test_stack_npred():
971
    pwl = PowerLawSpectralModel()
972
    gauss = GaussianSpatialModel(sigma="0.2 deg")
973
    model = SkyModel(pwl, gauss)
974
975
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=5)
976
    axis_etrue = MapAxis.from_energy_bounds(
977
        "0.1 TeV", "10 TeV", nbin=11, name="energy_true"
978
    )
979
980
    geom = WcsGeom.create(
981
        skydir=(0, 0),
982
        binsz=0.05,
983
        width=(2, 2),
984
        frame="icrs",
985
        axes=[axis],
986
    )
987
988
    dataset_1 = MapDataset.create(
989
        geom,
990
        energy_axis_true=axis_etrue,
991
        name="dataset-1",
992
        gti=GTI.create("0 min", "30 min"),
993
    )
994
    dataset_1.psf = None
995
    dataset_1.exposure.data += 1
996
    dataset_1.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
997
    dataset_1.background.data += 1
998
999
    bkg_model_1 = FoVBackgroundModel(dataset_name=dataset_1.name)
1000
    dataset_1.models = [model, bkg_model_1]
1001
1002
    dataset_2 = MapDataset.create(
1003
        geom,
1004
        energy_axis_true=axis_etrue,
1005
        name="dataset-2",
1006
        gti=GTI.create("30 min", "60 min"),
1007
    )
1008
    dataset_2.psf = None
1009
    dataset_2.exposure.data += 1
1010
    dataset_2.mask_safe = geom.energy_mask(energy_min=0.2 * u.TeV)
1011
    dataset_2.background.data += 1
1012
1013
    bkg_model_2 = FoVBackgroundModel(dataset_name=dataset_2.name)
1014
    dataset_2.models = [model, bkg_model_2]
1015
1016
    npred_1 = dataset_1.npred()
1017
    npred_1.data[~dataset_1.mask_safe.data] = 0
1018
    npred_2 = dataset_2.npred()
1019
    npred_2.data[~dataset_2.mask_safe.data] = 0
1020
1021
    stacked_npred = Map.from_geom(geom)
1022
    stacked_npred.stack(npred_1)
1023
    stacked_npred.stack(npred_2)
1024
1025
    stacked = MapDataset.create(geom, energy_axis_true=axis_etrue, name="stacked")
1026
    stacked.stack(dataset_1)
1027
    stacked.stack(dataset_2)
1028
1029
    npred_stacked = stacked.npred()
1030
1031
    assert_allclose(npred_stacked.data, stacked_npred.data)
1032
1033
1034
def to_cube(image):
1035
    # introduce a fake energy axis for now
1036
    axis = MapAxis.from_edges([1, 10] * u.TeV, name="energy")
1037
    geom = image.geom.to_cube([axis])
1038
    return WcsNDMap.from_geom(geom=geom, data=image.data)
1039
1040
1041
@pytest.fixture
1042
def images():
1043
    """Load some `counts`, `counts_off`, `acceptance_on`, `acceptance_off" images"""
1044
    filename = "$GAMMAPY_DATA/tests/unbundled/hess/survey/hess_survey_snippet.fits.gz"
1045
    return {
1046
        "counts": to_cube(WcsNDMap.read(filename, hdu="ON")),
1047
        "counts_off": to_cube(WcsNDMap.read(filename, hdu="OFF")),
1048
        "acceptance": to_cube(WcsNDMap.read(filename, hdu="ONEXPOSURE")),
1049
        "acceptance_off": to_cube(WcsNDMap.read(filename, hdu="OFFEXPOSURE")),
1050
        "exposure": to_cube(WcsNDMap.read(filename, hdu="EXPGAMMAMAP")),
1051
        "background": to_cube(WcsNDMap.read(filename, hdu="BACKGROUND")),
1052
    }
1053
1054
1055
def test_npred_psf_after_edisp():
1056
    energy_axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
1057
    energy_axis_true = MapAxis.from_energy_bounds(
1058
        "0.8 TeV", "15 TeV", nbin=6, name="energy_true"
1059
    )
1060
1061
    geom = WcsGeom.create(width=4 * u.deg, binsz=0.02, axes=[energy_axis])
1062
    dataset = MapDataset.create(geom=geom, energy_axis_true=energy_axis_true)
1063
    dataset.background.data += 1
1064
    dataset.exposure.data += 1e12
1065
    dataset.mask_safe.data += True
1066
    dataset.psf = PSFMap.from_gauss(
1067
        energy_axis_true=energy_axis_true, sigma=0.2 * u.deg
1068
    )
1069
1070
    model = SkyModel(
1071
        spectral_model=PowerLawSpectralModel(),
1072
        spatial_model=PointSpatialModel(),
1073
        name="test-model",
1074
    )
1075
1076
    model.apply_irf["psf_after_edisp"] = True
1077
1078
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
1079
    dataset.models = [bkg_model, model]
1080
1081
    npred = dataset.npred()
1082
    assert_allclose(npred.data.sum(), 129553.858658)
1083
1084
1085
def get_map_dataset_onoff(images, **kwargs):
1086
    """Returns a MapDatasetOnOff"""
1087
    mask_geom = images["counts"].geom
1088
    mask_data = np.ones(images["counts"].data.shape, dtype=bool)
1089
    mask_safe = Map.from_geom(mask_geom, data=mask_data)
1090
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1091
    energy_axis = mask_geom.axes["energy"]
1092
    energy_axis_true = energy_axis.copy(name="energy_true")
1093
1094
    psf = PSFMap.from_gauss(
1095
        energy_axis_true=energy_axis_true, sigma=0.2 * u.deg, geom=mask_geom.to_image()
1096
    )
1097
1098
    edisp = EDispKernelMap.from_diagonal_response(
1099
        energy_axis=energy_axis, energy_axis_true=energy_axis_true, geom=mask_geom
1100
    )
1101
1102
    return MapDatasetOnOff(
1103
        counts=images["counts"],
1104
        counts_off=images["counts_off"],
1105
        acceptance=images["acceptance"],
1106
        acceptance_off=images["acceptance_off"],
1107
        exposure=images["exposure"],
1108
        mask_safe=mask_safe,
1109
        psf=psf,
1110
        edisp=edisp,
1111
        gti=gti,
1112
        name="MapDatasetOnOff-test",
1113
        **kwargs,
1114
    )
1115
1116
1117
@requires_data()
1118
@pytest.mark.parametrize("lazy", [False, True])
1119
def test_map_dataset_on_off_fits_io(images, lazy, tmp_path):
1120
    dataset = get_map_dataset_onoff(images)
1121
    dataset.meta_table = Table(data=[[1.0 * u.h], [111]], names=["livetime", "obs_id"])
1122
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1123
    dataset.gti = gti
1124
1125
    hdulist = dataset.to_hdulist()
1126
    actual = [hdu.name for hdu in hdulist]
1127
1128
    desired = [
1129
        "PRIMARY",
1130
        "COUNTS",
1131
        "COUNTS_BANDS",
1132
        "EXPOSURE",
1133
        "EXPOSURE_BANDS",
1134
        "EDISP",
1135
        "EDISP_BANDS",
1136
        "EDISP_EXPOSURE",
1137
        "EDISP_EXPOSURE_BANDS",
1138
        "PSF",
1139
        "PSF_BANDS",
1140
        "PSF_EXPOSURE",
1141
        "PSF_EXPOSURE_BANDS",
1142
        "MASK_SAFE",
1143
        "MASK_SAFE_BANDS",
1144
        "GTI",
1145
        "META_TABLE",
1146
        "COUNTS_OFF",
1147
        "COUNTS_OFF_BANDS",
1148
        "ACCEPTANCE",
1149
        "ACCEPTANCE_BANDS",
1150
        "ACCEPTANCE_OFF",
1151
        "ACCEPTANCE_OFF_BANDS",
1152
    ]
1153
1154
    assert actual == desired
1155
1156
    dataset.write(tmp_path / "test.fits")
1157
1158
    if lazy:
1159
        with pytest.raises(NotImplementedError):
1160
            dataset_new = MapDatasetOnOff.read(tmp_path / "test.fits", lazy=lazy)
1161
    else:
1162
        dataset_new = MapDatasetOnOff.read(tmp_path / "test.fits", lazy=lazy)
1163
        assert dataset_new.name == "MapDatasetOnOff-test"
1164
        assert dataset_new.mask.data.dtype == bool
1165
        assert dataset_new.meta_table["livetime"] == 1.0 * u.h
1166
        assert dataset_new.meta_table["obs_id"] == 111
1167
1168
        assert_allclose(dataset.counts.data, dataset_new.counts.data)
1169
        assert_allclose(dataset.counts_off.data, dataset_new.counts_off.data)
1170
        assert_allclose(dataset.acceptance.data, dataset_new.acceptance.data)
1171
        assert_allclose(dataset.acceptance_off.data, dataset_new.acceptance_off.data)
1172
        assert_allclose(dataset.exposure.data, dataset_new.exposure.data)
1173
        assert_allclose(dataset.mask_safe, dataset_new.mask_safe)
1174
1175
        assert np.all(dataset.mask_safe.data == dataset_new.mask_safe.data)
1176
        assert dataset.mask_safe.geom == dataset_new.mask_safe.geom
1177
        assert dataset.counts.geom == dataset_new.counts.geom
1178
        assert dataset.exposure.geom == dataset_new.exposure.geom
1179
1180
        assert_allclose(
1181
            dataset.gti.time_sum.to_value("s"), dataset_new.gti.time_sum.to_value("s")
1182
        )
1183
1184
        assert dataset.psf.psf_map == dataset_new.psf.psf_map
1185
        assert dataset.psf.exposure_map == dataset_new.psf.exposure_map
1186
        assert dataset.edisp.edisp_map == dataset_new.edisp.edisp_map
1187
        assert dataset.edisp.exposure_map == dataset_new.edisp.exposure_map
1188
1189
1190
@requires_data()
1191
def test_map_datasets_on_off_fits_io(images, tmp_path):
1192
    dataset = get_map_dataset_onoff(images)
1193
    Datasets([dataset]).write(tmp_path / "test.yaml")
1194
    datasets = Datasets.read(tmp_path / "test.yaml", lazy=False)
1195
    with pytest.raises(NotImplementedError):
1196
        datasets = Datasets.read(tmp_path / "test.yaml", lazy=True)
1197
1198
    dataset_new = datasets[0]
1199
1200
    assert dataset.name == dataset_new.name
1201
    assert_allclose(dataset.counts.data, dataset_new.counts.data)
1202
    assert_allclose(dataset.counts_off.data, dataset_new.counts_off.data)
1203
    assert_allclose(dataset.acceptance.data, dataset_new.acceptance.data)
1204
    assert_allclose(dataset.acceptance_off.data, dataset_new.acceptance_off.data)
1205
    assert_allclose(dataset.exposure.data, dataset_new.exposure.data)
1206
    assert_allclose(dataset.mask_safe, dataset_new.mask_safe)
1207
1208
1209
def test_create_onoff(geom):
1210
    # tests empty datasets created
1211
1212
    migra_axis = MapAxis(nodes=np.linspace(0.0, 3.0, 51), unit="", name="migra")
1213
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
1214
    energy_axis = geom.axes["energy"].copy(name="energy_true")
1215
1216
    empty_dataset = MapDatasetOnOff.create(geom, energy_axis, migra_axis, rad_axis)
1217
1218
    assert_allclose(empty_dataset.counts.data.sum(), 0.0)
1219
    assert_allclose(empty_dataset.counts_off.data.sum(), 0.0)
1220
    assert_allclose(empty_dataset.acceptance.data.sum(), 0.0)
1221
    assert_allclose(empty_dataset.acceptance_off.data.sum(), 0.0)
1222
1223
    assert empty_dataset.psf.psf_map.data.shape == (2, 50, 10, 10)
1224
    assert empty_dataset.psf.exposure_map.data.shape == (2, 1, 10, 10)
1225
1226
    assert empty_dataset.edisp.edisp_map.data.shape == (2, 50, 10, 10)
1227
    assert empty_dataset.edisp.exposure_map.data.shape == (2, 1, 10, 10)
1228
1229
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 3333.333333)
1230
1231
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
1232
1233
1234
@requires_data()
1235
def test_map_dataset_onoff_str(images):
1236
    dataset = get_map_dataset_onoff(images)
1237
    assert "MapDatasetOnOff" in str(dataset)
1238
1239
1240
@requires_data()
1241
def test_stack_onoff(images):
1242
    dataset = get_map_dataset_onoff(images)
1243
    stacked = dataset.copy()
1244
    stacked.stack(dataset)
1245
1246
    assert_allclose(stacked.counts.data.sum(), 2 * dataset.counts.data.sum())
1247
    assert_allclose(stacked.counts_off.data.sum(), 2 * dataset.counts_off.data.sum())
1248
    assert_allclose(
1249
        stacked.acceptance.data.sum(), dataset.data_shape[1] * dataset.data_shape[2]
1250
    )
1251
    assert_allclose(np.nansum(stacked.acceptance_off.data), 2.925793e08, rtol=1e-5)
1252
    assert_allclose(stacked.exposure.data, 2.0 * dataset.exposure.data)
1253
1254
1255
def test_dataset_cutout_aligned(geom):
1256
    dataset = MapDataset.create(geom)
1257
1258
    kwargs = {"position": geom.center_skydir, "width": 1 * u.deg}
1259
    geoms = {name: geom.cutout(**kwargs) for name, geom in dataset.geoms.items()}
1260
1261
    cutout = MapDataset.from_geoms(**geoms, name="cutout")
1262
1263
    assert dataset.counts.geom.is_aligned(cutout.counts.geom)
1264
    assert dataset.exposure.geom.is_aligned(cutout.exposure.geom)
1265
    assert dataset.edisp.edisp_map.geom.is_aligned(cutout.edisp.edisp_map.geom)
1266
    assert dataset.psf.psf_map.geom.is_aligned(cutout.psf.psf_map.geom)
1267
1268
1269
def test_stack_onoff_cutout(geom_image):
1270
    # Test stacking of cutouts
1271
    energy_axis_true = MapAxis.from_energy_bounds(
1272
        "1 TeV", "10 TeV", nbin=3, name="energy_true"
1273
    )
1274
1275
    dataset = MapDatasetOnOff.create(geom_image, energy_axis_true=energy_axis_true)
1276
    dataset.gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1277
1278
    kwargs = {"position": geom_image.center_skydir, "width": 1 * u.deg}
1279
    geoms = {name: geom.cutout(**kwargs) for name, geom in dataset.geoms.items()}
1280
1281
    dataset_cutout = MapDatasetOnOff.from_geoms(**geoms, name="cutout-dataset")
1282
    dataset_cutout.gti = GTI.create(
1283
        [0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00"
1284
    )
1285
    dataset_cutout.mask_safe.data += True
1286
    dataset_cutout.counts.data += 1
1287
    dataset_cutout.counts_off.data += 1
1288
    dataset_cutout.exposure.data += 1
1289
1290
    dataset.stack(dataset_cutout)
1291
1292
    assert_allclose(dataset.counts.data.sum(), 2500)
1293
    assert_allclose(dataset.counts_off.data.sum(), 2500)
1294
    assert_allclose(dataset.alpha.data.sum(), 0)
1295
    assert_allclose(dataset.exposure.data.sum(), 7500)
1296
    assert dataset_cutout.name == "cutout-dataset"
1297
1298
1299
def test_datasets_io_no_model(tmpdir):
1300
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=2)
1301
    geom = WcsGeom.create(npix=(5, 5), axes=[axis])
1302
    dataset_1 = MapDataset.create(geom, name="dataset_1")
1303
    dataset_2 = MapDataset.create(geom, name="dataset_2")
1304
1305
    datasets = Datasets([dataset_1, dataset_2])
1306
1307
    datasets.write(filename=tmpdir / "datasets.yaml")
1308
1309
    filename_1 = tmpdir / "dataset_1.fits"
1310
    assert filename_1.exists()
1311
1312
    filename_2 = tmpdir / "dataset_2.fits"
1313
    assert filename_2.exists()
1314
1315
1316
@requires_data()
1317
def test_map_dataset_on_off_to_spectrum_dataset(images):
1318
    dataset = get_map_dataset_onoff(images)
1319
1320
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1321
    dataset.gti = gti
1322
1323
    on_region = CircleSkyRegion(
1324
        center=dataset.counts.geom.center_skydir, radius=0.1 * u.deg
1325
    )
1326
1327
    spectrum_dataset = dataset.to_spectrum_dataset(on_region)
1328
1329
    assert spectrum_dataset.counts.data[0] == 8
1330
    assert spectrum_dataset.data_shape == (1, 1, 1)
1331
    assert spectrum_dataset.counts_off.data[0] == 33914
1332
    assert_allclose(spectrum_dataset.alpha.data[0], 0.0002143, atol=1e-7)
1333
1334
    excess_map = images["counts"] - images["background"]
1335
    excess_true = excess_map.get_spectrum(on_region, np.sum).data[0]
1336
1337
    excess = spectrum_dataset.excess.data[0]
1338
    assert_allclose(excess, excess_true, rtol=1e-3)
1339
1340
    assert spectrum_dataset.name != dataset.name
1341
1342
1343
@requires_data()
1344
def test_map_dataset_on_off_to_spectrum_dataset_weights():
1345
    e_reco = MapAxis.from_bounds(1, 10, nbin=3, unit="TeV", name="energy")
1346
1347
    geom = WcsGeom.create(
1348
        skydir=(0, 0), width=(2.5, 2.5), binsz=0.5, axes=[e_reco], frame="galactic"
1349
    )
1350
    counts = Map.from_geom(geom)
1351
    counts.data += 1
1352
    counts_off = Map.from_geom(geom)
1353
    counts_off.data += 2
1354
    acceptance = Map.from_geom(geom)
1355
    acceptance.data += 1
1356
    acceptance_off = Map.from_geom(geom)
1357
    acceptance_off.data += 4
1358
1359
    weights = Map.from_geom(geom, dtype="bool")
1360
    weights.data[1:, 2:4, 2] = True
1361
1362
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1363
1364
    dataset = MapDatasetOnOff(
1365
        counts=counts,
1366
        counts_off=counts_off,
1367
        acceptance=acceptance,
1368
        acceptance_off=acceptance_off,
1369
        mask_safe=weights,
1370
        gti=gti,
1371
    )
1372
1373
    on_region = CircleSkyRegion(
1374
        center=dataset.counts.geom.center_skydir, radius=1.5 * u.deg
1375
    )
1376
1377
    spectrum_dataset = dataset.to_spectrum_dataset(on_region)
1378
1379
    assert_allclose(spectrum_dataset.counts.data[:, 0, 0], [0, 2, 2])
1380
    assert_allclose(spectrum_dataset.counts_off.data[:, 0, 0], [0, 4, 4])
1381
    assert_allclose(spectrum_dataset.acceptance.data[:, 0, 0], [0, 0.08, 0.08])
1382
    assert_allclose(spectrum_dataset.acceptance_off.data[:, 0, 0], [0, 0.32, 0.32])
1383
    assert_allclose(spectrum_dataset.alpha.data[:, 0, 0], [0, 0.25, 0.25])
1384
1385
1386
@requires_data()
1387
def test_map_dataset_on_off_cutout(images):
1388
    dataset = get_map_dataset_onoff(images)
1389
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1390
    dataset.gti = gti
1391
1392
    cutout_dataset = dataset.cutout(
1393
        images["counts"].geom.center_skydir, ["1 deg", "1 deg"]
1394
    )
1395
1396
    assert cutout_dataset.counts.data.shape == (1, 50, 50)
1397
    assert cutout_dataset.counts_off.data.shape == (1, 50, 50)
1398
    assert cutout_dataset.acceptance.data.shape == (1, 50, 50)
1399
    assert cutout_dataset.acceptance_off.data.shape == (1, 50, 50)
1400
    assert cutout_dataset.name != dataset.name
1401
1402
1403
def test_map_dataset_on_off_fake(geom):
1404
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
1405
    energy_true_axis = geom.axes["energy"].copy(name="energy_true")
1406
1407
    empty_dataset = MapDatasetOnOff.create(geom, energy_true_axis, rad_axis=rad_axis)
1408
    empty_dataset.acceptance.data = 1.0
1409
    empty_dataset.acceptance_off.data = 10.0
1410
1411
    empty_dataset.acceptance_off.data[0, 50, 50] = 0
1412
    background_map = Map.from_geom(geom, data=1)
1413
    empty_dataset.fake(background_map, random_state=42)
1414
1415
    assert_allclose(empty_dataset.counts.data[0, 50, 50], 0)
1416
    assert_allclose(empty_dataset.counts.data.mean(), 0.99445, rtol=1e-3)
1417
    assert_allclose(empty_dataset.counts_off.data.mean(), 10.00055, rtol=1e-3)
1418
1419
1420
@requires_data()
1421
def test_map_dataset_on_off_to_image():
1422
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1423
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1424
1425
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1426
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1427
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1428
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1429
    acceptance_off *= 2
1430
1431
    dataset = MapDatasetOnOff(
1432
        counts=counts,
1433
        counts_off=counts_off,
1434
        acceptance=acceptance,
1435
        acceptance_off=acceptance_off,
1436
    )
1437
    image_dataset = dataset.to_image()
1438
1439
    assert image_dataset.counts.data.shape == (1, 10, 10)
1440
    assert image_dataset.acceptance_off.data.shape == (1, 10, 10)
1441
    assert_allclose(image_dataset.acceptance, 2)
1442
    assert_allclose(image_dataset.acceptance_off, 4)
1443
    assert_allclose(image_dataset.counts_off, 2)
1444
    assert image_dataset.name != dataset.name
1445
1446
    # Try with a safe_mask
1447
    mask_safe = Map.from_geom(geom, data=np.ones((2, 10, 10), dtype="bool"))
1448
    mask_safe.data[0] = 0
1449
    dataset.mask_safe = mask_safe
1450
    image_dataset = dataset.to_image()
1451
1452
    assert_allclose(image_dataset.acceptance, 1)
1453
    assert_allclose(image_dataset.acceptance_off, 2)
1454
    assert_allclose(image_dataset.counts_off, 1)
1455
1456
1457
def test_map_dataset_geom(geom, sky_model):
1458
    e_true = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=5, name="energy_true")
1459
    dataset = MapDataset.create(geom, energy_axis_true=e_true)
1460
    dataset.counts = None
1461
    dataset.background = None
1462
1463
    npred = dataset.npred()
1464
    assert npred.geom == geom
1465
1466
    dataset.mask_safe = None
1467
    dataset.mask_fit = None
1468
1469
    with pytest.raises(ValueError):
1470
        dataset._geom
1471
1472
1473
@requires_data()
1474
def test_names(geom, geom_etrue, sky_model):
1475
    m = Map.from_geom(geom)
1476
    m.quantity = 0.2 * np.ones(m.data.shape)
1477
    background_model1 = FoVBackgroundModel(dataset_name="test")
1478
    assert background_model1.name == "test-bkg"
1479
1480
    c_map1 = Map.from_geom(geom)
1481
    c_map1.quantity = 0.3 * np.ones(c_map1.data.shape)
1482
1483
    model1 = sky_model.copy()
1484
    assert model1.name != sky_model.name
1485
    model1 = sky_model.copy(name="model1")
1486
    assert model1.name == "model1"
1487
    model2 = sky_model.copy(name="model2")
1488
1489
    dataset1 = MapDataset(
1490
        counts=c_map1,
1491
        models=Models([model1, model2, background_model1]),
1492
        exposure=get_exposure(geom_etrue),
1493
        background=m,
1494
        name="test",
1495
    )
1496
1497
    dataset2 = dataset1.copy()
1498
    assert dataset2.name != dataset1.name
1499
    assert dataset2.models is None
1500
1501
    dataset2 = dataset1.copy(name="dataset2")
1502
1503
    assert dataset2.name == "dataset2"
1504
    assert dataset2.models is None
1505
1506
1507
def test_stack_dataset_dataset_on_off():
1508
    axis = MapAxis.from_edges([1, 10] * u.TeV, name="energy")
1509
    geom = WcsGeom.create(width=1, axes=[axis])
1510
1511
    gti = GTI.create([0 * u.s], [1 * u.h])
1512
1513
    dataset = MapDataset.create(geom, gti=gti)
1514
    dataset_on_off = MapDatasetOnOff.create(geom, gti=gti)
1515
    dataset_on_off.mask_safe.data += True
1516
1517
    dataset_on_off.acceptance_off += 5
1518
    dataset_on_off.acceptance += 1
1519
    dataset_on_off.counts_off += 1
1520
    dataset.stack(dataset_on_off)
1521
1522
    assert_allclose(dataset.npred_background().data, 0.166667, rtol=1e-3)
1523
1524
1525
@requires_data()
1526
def test_info_dict_on_off(images):
1527
    dataset = get_map_dataset_onoff(images)
1528
    info_dict = dataset.info_dict()
1529
    assert_allclose(info_dict["counts"], 4299, rtol=1e-3)
1530
    assert_allclose(info_dict["excess"], -22.52295, rtol=1e-3)
1531
    assert_allclose(info_dict["exposure_min"].value, 1.739467e08, rtol=1e-3)
1532
    assert_allclose(info_dict["exposure_max"].value, 3.4298378e09, rtol=1e-3)
1533
    assert_allclose(info_dict["npred"], 4321.518, rtol=1e-3)
1534
    assert_allclose(info_dict["counts_off"], 20407510.0, rtol=1e-3)
1535
    assert_allclose(info_dict["acceptance"], 4272.7075, rtol=1e-3)
1536
    assert_allclose(info_dict["acceptance_off"], 20175596.0, rtol=1e-3)
1537
    assert_allclose(info_dict["alpha"], 0.00021176, rtol=1e-3)
1538
    assert_allclose(info_dict["ontime"].value, 3600)
1539
1540
1541
def test_slice_by_idx():
1542
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=17)
1543
    axis_etrue = MapAxis.from_energy_bounds(
1544
        "0.1 TeV", "10 TeV", nbin=31, name="energy_true"
1545
    )
1546
1547
    geom = WcsGeom.create(
1548
        skydir=(0, 0),
1549
        binsz=0.5,
1550
        width=(2, 2),
1551
        frame="icrs",
1552
        axes=[axis],
1553
    )
1554
    dataset = MapDataset.create(geom=geom, energy_axis_true=axis_etrue, binsz_irf=0.5)
1555
1556
    slices = {"energy": slice(5, 10)}
1557
    sub_dataset = dataset.slice_by_idx(slices)
1558
1559
    assert sub_dataset.counts.geom.data_shape == (5, 4, 4)
1560
    assert sub_dataset.mask_safe.geom.data_shape == (5, 4, 4)
1561
    assert sub_dataset.npred_background().geom.data_shape == (5, 4, 4)
1562
    assert sub_dataset.exposure.geom.data_shape == (31, 4, 4)
1563
    assert sub_dataset.edisp.edisp_map.geom.data_shape == (31, 5, 4, 4)
1564
    assert sub_dataset.psf.psf_map.geom.data_shape == (31, 66, 4, 4)
1565
1566
    axis = sub_dataset.counts.geom.axes["energy"]
1567
    assert_allclose(axis.edges[0].value, 0.387468, rtol=1e-5)
1568
1569
    slices = {"energy_true": slice(5, 10)}
1570
    sub_dataset = dataset.slice_by_idx(slices)
1571
1572
    assert sub_dataset.counts.geom.data_shape == (17, 4, 4)
1573
    assert sub_dataset.mask_safe.geom.data_shape == (17, 4, 4)
1574
    assert sub_dataset.npred_background().geom.data_shape == (17, 4, 4)
1575
    assert sub_dataset.exposure.geom.data_shape == (5, 4, 4)
1576
    assert sub_dataset.edisp.edisp_map.geom.data_shape == (5, 17, 4, 4)
1577
    assert sub_dataset.psf.psf_map.geom.data_shape == (5, 66, 4, 4)
1578
1579
    axis = sub_dataset.counts.geom.axes["energy"]
1580
    assert_allclose(axis.edges[0].value, 0.1, rtol=1e-5)
1581
1582
    axis = sub_dataset.exposure.geom.axes["energy_true"]
1583
    assert_allclose(axis.edges[0].value, 0.210175, rtol=1e-5)
1584
1585
1586
def test_plot_residual_onoff():
1587
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1588
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1589
1590
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1591
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1592
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1593
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1594
    acceptance_off *= 2
1595
1596
    dataset = MapDatasetOnOff(
1597
        counts=counts,
1598
        counts_off=counts_off,
1599
        acceptance=acceptance,
1600
        acceptance_off=acceptance_off,
1601
    )
1602
    with mpl_plot_check():
1603
        dataset.plot_residuals_spatial()
1604
1605
1606
def test_to_map_dataset():
1607
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1608
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1609
1610
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1611
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1612
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1613
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1614
    acceptance_off *= 2
1615
1616
    dataset_onoff = MapDatasetOnOff(
1617
        counts=counts,
1618
        counts_off=counts_off,
1619
        acceptance=acceptance,
1620
        acceptance_off=acceptance_off,
1621
    )
1622
1623
    dataset = dataset_onoff.to_map_dataset(name="ds")
1624
1625
    assert dataset.name == "ds"
1626
    assert_allclose(dataset.npred_background().data.sum(), 100)
1627
    assert isinstance(dataset, MapDataset)
1628
    assert dataset.counts == dataset_onoff.counts
1629
1630
1631
def test_downsample_onoff():
1632
    axis = MapAxis.from_energy_bounds(1, 10, 4, unit="TeV")
1633
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1634
1635
    counts = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1636
    counts_off = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1637
    acceptance = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1638
    acceptance_off = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1639
    acceptance_off *= 2
1640
1641
    dataset_onoff = MapDatasetOnOff(
1642
        counts=counts,
1643
        counts_off=counts_off,
1644
        acceptance=acceptance,
1645
        acceptance_off=acceptance_off,
1646
    )
1647
1648
    downsampled = dataset_onoff.downsample(2, axis_name="energy")
1649
1650
    assert downsampled.counts.data.shape == (2, 10, 10)
1651
    assert downsampled.counts.data.sum() == dataset_onoff.counts.data.sum()
1652
    assert downsampled.counts_off.data.sum() == dataset_onoff.counts_off.data.sum()
1653
    assert_allclose(downsampled.alpha.data, 0.5)
1654
1655
1656
@requires_data()
1657
def test_source_outside_geom(sky_model, geom, geom_etrue):
1658
    dataset = get_map_dataset(geom, geom_etrue)
1659
    dataset.edisp = get_edisp(geom, geom_etrue)
1660
1661
    models = dataset.models
1662
    model = SkyModel(
1663
        PowerLawSpectralModel(),
1664
        DiskSpatialModel(lon_0=276.4 * u.deg, lat_0=-28.9 * u.deg, r_0=10 * u.deg),
1665
    )
1666
1667
    assert not geom.to_image().contains(model.position)[0]
1668
    dataset.models = models + [model]
1669
    dataset.npred()
1670
    model_npred = dataset.evaluators[model.name].compute_npred().data
1671
    assert np.sum(np.isnan(model_npred)) == 0
1672
    assert np.sum(~np.isfinite(model_npred)) == 0
1673
    assert np.sum(model_npred) > 0
1674
1675
1676
# this is a regression test for an issue found, where the model selection fails
1677
@requires_data()
1678
def test_source_outside_geom_fermi():
1679
    dataset = MapDataset.read(
1680
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc.fits.gz", format="gadf"
1681
    )
1682
1683
    catalog = SourceCatalog3FHL()
1684
    source = catalog["3FHL J1637.8-3448"]
1685
1686
    dataset.models = source.sky_model()
1687
    npred = dataset.npred()
1688
1689
    assert_allclose(npred.data.sum(), 28548.63, rtol=1e-4)
1690
1691
1692
def test_region_geom_io(tmpdir):
1693
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=1)
1694
    geom = RegionGeom.create("icrs;circle(0, 0, 0.2)", axes=[axis])
1695
1696
    dataset = MapDataset.create(geom, name="geom-test")
1697
1698
    filename = tmpdir / "test.fits"
1699
    dataset.write(filename)
1700
1701
    dataset = MapDataset.read(filename, format="gadf")
1702
1703
    assert dataset.name == "geom-test"
1704
    assert isinstance(dataset.counts.geom, RegionGeom)
1705
    assert isinstance(dataset.edisp.edisp_map.geom, RegionGeom)
1706
    assert isinstance(dataset.psf.psf_map.geom, RegionGeom)
1707
1708
1709
def test_dataset_mixed_geom(tmpdir):
1710
    energy_axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
1711
    energy_axis_true = MapAxis.from_energy_bounds(
1712
        "1 TeV", "10 TeV", nbin=7, name="energy_true"
1713
    )
1714
1715
    rad_axis = MapAxis.from_bounds(0, 1, nbin=10, name="rad", unit="deg")
1716
1717
    geom = WcsGeom.create(npix=5, axes=[energy_axis])
1718
    geom_exposure = WcsGeom.create(npix=5, axes=[energy_axis_true])
1719
1720
    geom_psf = RegionGeom.create(
1721
        "icrs;circle(0, 0, 0.2)", axes=[rad_axis, energy_axis_true]
1722
    )
1723
1724
    geom_edisp = RegionGeom.create(
1725
        "icrs;circle(0, 0, 0.2)", axes=[energy_axis, energy_axis_true]
1726
    )
1727
1728
    dataset = MapDataset.from_geoms(
1729
        geom=geom, geom_exposure=geom_exposure, geom_psf=geom_psf, geom_edisp=geom_edisp
1730
    )
1731
1732
    filename = tmpdir / "test.fits"
1733
    dataset.write(filename)
1734
1735
    dataset = MapDataset.read(filename, format="gadf")
1736
1737
    assert isinstance(dataset.counts.geom, WcsGeom)
1738
    assert isinstance(dataset.exposure.geom, WcsGeom)
1739
    assert isinstance(dataset.background.geom, WcsGeom)
1740
1741
    assert isinstance(dataset.psf.psf_map.geom.region, CircleSkyRegion)
1742
    assert isinstance(dataset.edisp.edisp_map.geom.region, CircleSkyRegion)
1743
1744
    geom_psf_reco = RegionGeom.create(
1745
        "icrs;circle(0, 0, 0.2)", axes=[rad_axis, energy_axis]
1746
    )
1747
1748
    dataset = MapDataset.from_geoms(
1749
        geom=geom,
1750
        geom_exposure=geom_exposure,
1751
        geom_psf=geom_psf_reco,
1752
        geom_edisp=geom_edisp,
1753
    )
1754
    assert dataset.psf.tag == "psf_map_reco"
1755
1756
1757
@requires_data()
1758
def test_map_dataset_region_geom_npred():
1759
    dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz")
1760
1761
    pwl = PowerLawSpectralModel()
1762
    point = PointSpatialModel(lon_0="0 deg", lat_0="0 deg", frame="galactic")
1763
    model_1 = SkyModel(pwl, point, name="model-1")
1764
1765
    pwl = PowerLawSpectralModel(amplitude="1e-11 TeV-1 cm-2 s-1")
1766
    gauss = GaussianSpatialModel(
1767
        lon_0="0.3 deg", lat_0="0.3 deg", sigma="0.5 deg", frame="galactic"
1768
    )
1769
    model_2 = SkyModel(pwl, gauss, name="model-2")
1770
1771
    dataset.models = [model_1, model_2]
1772
1773
    region = RegionGeom.create("galactic;circle(0, 0, 0.4)").region
1774
    npred_ref = dataset.npred().to_region_nd_map(region)
1775
1776
    dataset_spec = dataset.to_region_map_dataset(region)
1777
    dataset_spec.models = [model_1, model_2]
1778
1779
    npred = dataset_spec.npred()
1780
1781
    assert_allclose(npred_ref.data, npred.data, rtol=1e-2)
1782
1783
1784 View Code Duplication
@requires_dependency("healpy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1785
def test_map_dataset_create_hpx_geom(geom_hpx):
1786
1787
    dataset = MapDataset.create(**geom_hpx, binsz_irf=10 * u.deg)
1788
1789
    assert isinstance(dataset.counts.geom, HpxGeom)
1790
    assert dataset.counts.data.shape == (3, 12288)
1791
1792
    assert isinstance(dataset.background.geom, HpxGeom)
1793
    assert dataset.background.data.shape == (3, 12288)
1794
1795
    assert isinstance(dataset.exposure.geom, HpxGeom)
1796
    assert dataset.exposure.data.shape == (4, 12288)
1797
1798
    assert isinstance(dataset.edisp.edisp_map.geom, HpxGeom)
1799
    assert dataset.edisp.edisp_map.data.shape == (4, 3, 768)
1800
1801
    assert isinstance(dataset.psf.psf_map.geom, HpxGeom)
1802
    assert dataset.psf.psf_map.data.shape == (4, 66, 768)
1803
1804
1805 View Code Duplication
@requires_dependency("healpy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1806
def test_map_dataset_create_hpx_geom_partial(geom_hpx_partial):
1807
1808
    dataset = MapDataset.create(**geom_hpx_partial, binsz_irf=2 * u.deg)
1809
1810
    assert isinstance(dataset.counts.geom, HpxGeom)
1811
    assert dataset.counts.data.shape == (3, 90)
1812
1813
    assert isinstance(dataset.background.geom, HpxGeom)
1814
    assert dataset.background.data.shape == (3, 90)
1815
1816
    assert isinstance(dataset.exposure.geom, HpxGeom)
1817
    assert dataset.exposure.data.shape == (4, 90)
1818
1819
    assert isinstance(dataset.edisp.edisp_map.geom, HpxGeom)
1820
    assert dataset.edisp.edisp_map.data.shape == (4, 3, 24)
1821
1822
    assert isinstance(dataset.psf.psf_map.geom, HpxGeom)
1823
    assert dataset.psf.psf_map.data.shape == (4, 66, 24)
1824
1825
1826
@requires_dependency("healpy")
1827
def test_map_dataset_stack_hpx_geom(geom_hpx_partial, geom_hpx):
1828
1829
    dataset_all = MapDataset.create(**geom_hpx, binsz_irf=5 * u.deg)
1830
1831
    gti = GTI.create(start=0 * u.s, stop=30 * u.min)
1832
    dataset_cutout = MapDataset.create(**geom_hpx_partial, binsz_irf=5 * u.deg, gti=gti)
1833
    dataset_cutout.counts.data += 1
1834
    dataset_cutout.background.data += 1
1835
    dataset_cutout.exposure.data += 1
1836
    dataset_cutout.mask_safe.data[...] = True
1837
1838
    dataset_all.stack(dataset_cutout)
1839
1840
    assert_allclose(dataset_all.counts.data.sum(), 3 * 90)
1841
    assert_allclose(dataset_all.background.data.sum(), 3 * 90)
1842
    assert_allclose(dataset_all.exposure.data.sum(), 4 * 90)
1843
1844
1845
@requires_data()
1846
@requires_dependency("healpy")
1847
def test_map_dataset_hpx_geom_npred(geom_hpx_partial):
1848
    hpx_geom = geom_hpx_partial["geom"]
1849
    hpx_true = hpx_geom.to_image().to_cube([geom_hpx_partial["energy_axis_true"]])
1850
    dataset = get_map_dataset(hpx_geom, hpx_true, edisp="edispkernelmap")
1851
1852
    pwl = PowerLawSpectralModel()
1853
    point = PointSpatialModel(lon_0="110 deg", lat_0="75 deg", frame="galactic")
1854
    sky_model = SkyModel(pwl, point)
1855
1856
    dataset.models = [sky_model]
1857
1858
    assert_allclose(dataset.npred().data.sum(), 54, rtol=1e-3)
1859
1860
1861
@requires_data()
1862
def test_peek(images):
1863
    dataset = get_map_dataset_onoff(images)
1864
1865
    with mpl_plot_check():
1866
        dataset.peek()
1867