Passed
Push — master ( 84d8ca...3f7306 )
by Axel
03:10 queued 11s
created

gammapy.datasets.tests.test_map.test_npred()   A

Complexity

Conditions 2

Size

Total Lines 33
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 23
nop 3
dl 0
loc 33
rs 9.328
c 0
b 0
f 0
1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
import pytest
3
import json
4
import numpy as np
5
from numpy.testing import assert_allclose
6
import astropy.units as u
7
from astropy.coordinates import SkyCoord
8
from astropy.table import Table
9
from regions import CircleSkyRegion
10
from gammapy.catalog import SourceCatalog3FHL
11
from gammapy.data import GTI
12
from gammapy.datasets import Datasets, MapDataset, MapDatasetOnOff
13
from gammapy.datasets.map import RAD_AXIS_DEFAULT
14
from gammapy.irf import (
15
    EDispKernelMap,
16
    EDispMap,
17
    EnergyDispersion2D,
18
    EffectiveAreaTable2D,
19
    EnergyDependentMultiGaussPSF,
20
    PSFMap,
21
)
22
23
from gammapy.makers.utils import make_map_exposure_true_energy, make_psf_map
24
from gammapy.maps import (
25
    Map,
26
    MapAxis,
27
    WcsGeom,
28
    WcsNDMap,
29
    RegionGeom,
30
    HpxGeom,
31
)
32
from gammapy.maps.io import JsonQuantityEncoder
33
from gammapy.modeling import Fit
34
from gammapy.modeling.models import (
35
    FoVBackgroundModel,
36
    GaussianSpatialModel,
37
    Models,
38
    PointSpatialModel,
39
    PowerLawSpectralModel,
40
    SkyModel,
41
    DiskSpatialModel,
42
)
43
from gammapy.utils.testing import mpl_plot_check, requires_data, requires_dependency
44
45
46
@pytest.fixture
47
def geom_hpx():
48
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
49
50
    energy_axis_true = MapAxis.from_energy_bounds(
51
        "1 TeV", "10 TeV", nbin=4, name="energy_true"
52
    )
53
54
    geom = HpxGeom.create(nside=32, axes=[axis], frame="galactic")
55
56
    return {"geom": geom, "energy_axis_true": energy_axis_true}
57
58
59
@pytest.fixture
60
def geom_hpx_partial():
61
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
62
63
    energy_axis_true = MapAxis.from_energy_bounds(
64
        "1 TeV", "10 TeV", nbin=4, name="energy_true"
65
    )
66
67
    geom = HpxGeom.create(
68
        nside=32, axes=[axis], frame="galactic", region="DISK(110.,75.,10.)"
69
    )
70
71
    return {"geom": geom, "energy_axis_true": energy_axis_true}
72
73
74
@pytest.fixture
75
def geom():
76
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=2)
77
    return WcsGeom.create(
78
        skydir=(266.40498829, -28.93617776),
79
        binsz=0.02,
80
        width=(2, 2),
81
        frame="icrs",
82
        axes=[axis],
83
    )
84
85
86
@pytest.fixture
87
def geom1():
88
    e_axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=20)
89
    t_axis = MapAxis.from_bounds(0, 10, 2, name="time", unit="s")
90
    return WcsGeom.create(
91
        skydir=(266.40498829, -28.93617776),
92
        binsz=0.02,
93
        width=(3, 2),
94
        frame="icrs",
95
        axes=[e_axis, t_axis],
96
    )
97
98
99
@pytest.fixture
100
def geom_etrue():
101
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=3, name="energy_true")
102
    return WcsGeom.create(
103
        skydir=(266.40498829, -28.93617776),
104
        binsz=0.02,
105
        width=(2, 2),
106
        frame="icrs",
107
        axes=[axis],
108
    )
109
110
111
@pytest.fixture
112
def geom_image():
113
    energy = np.logspace(-1.0, 1.0, 2)
114
    axis = MapAxis.from_edges(energy, name="energy", unit=u.TeV, interp="log")
115
    return WcsGeom.create(
116
        skydir=(0, 0), binsz=0.02, width=(2, 2), frame="galactic", axes=[axis]
117
    )
118
119
120
def get_exposure(geom_etrue):
121
    filename = (
122
        "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits"
123
    )
124
    aeff = EffectiveAreaTable2D.read(filename, hdu="EFFECTIVE AREA")
125
126
    exposure_map = make_map_exposure_true_energy(
127
        pointing=SkyCoord(1, 0.5, unit="deg", frame="galactic"),
128
        livetime=1 * u.hr,
129
        aeff=aeff,
130
        geom=geom_etrue,
131
    )
132
    return exposure_map
133
134
135
def get_psf():
136
    filename = (
137
        "$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits"
138
    )
139
    psf = EnergyDependentMultiGaussPSF.read(filename, hdu="POINT SPREAD FUNCTION")
140
141
    geom = WcsGeom.create(
142
        skydir=(0, 0),
143
        frame="galactic",
144
        binsz=2,
145
        width=(2, 2),
146
        axes=[RAD_AXIS_DEFAULT, psf.axes["energy_true"]],
147
    )
148
149
    return make_psf_map(
150
        psf=psf,
151
        pointing=SkyCoord(0, 0.5, unit="deg", frame="galactic"),
152
        geom=geom,
153
        exposure_map=Map.from_geom(geom.squash("rad"), unit="cm2 s"),
154
    )
155
156
157
@requires_data()
158
def get_edisp(geom, geom_etrue):
159
    filename = "$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz"
160
    edisp2d = EnergyDispersion2D.read(filename, hdu="EDISP")
161
    energy = geom.axes["energy"].edges
162
    energy_true = geom_etrue.axes["energy_true"].edges
163
    edisp_kernel = edisp2d.to_edisp_kernel(
164
        offset="1.2 deg", energy=energy, energy_true=energy_true
165
    )
166
    edisp = EDispKernelMap.from_edisp_kernel(edisp_kernel)
167
    return edisp
168
169
170
@pytest.fixture
171
def sky_model():
172
    spatial_model = GaussianSpatialModel(
173
        lon_0="0.2 deg", lat_0="0.1 deg", sigma="0.2 deg", frame="galactic"
174
    )
175
    spectral_model = PowerLawSpectralModel(
176
        index=3, amplitude="1e-11 cm-2 s-1 TeV-1", reference="1 TeV"
177
    )
178
    return SkyModel(
179
        spatial_model=spatial_model, spectral_model=spectral_model, name="test-model"
180
    )
181
182
183
def get_map_dataset(geom, geom_etrue, edisp="edispmap", name="test", **kwargs):
184
    """Returns a MapDataset"""
185
    # define background model
186
    background = Map.from_geom(geom)
187
    background.data += 0.2
188
189
    psf = get_psf()
190
    exposure = get_exposure(geom_etrue)
191
192
    e_reco = geom.axes["energy"]
193
    e_true = geom_etrue.axes["energy_true"]
194
195
    if edisp == "edispmap":
196
        edisp = EDispMap.from_diagonal_response(energy_axis_true=e_true)
197
        data = exposure.get_spectrum(geom.center_skydir).data
198
        edisp.exposure_map.data = np.repeat(data, 2, axis=-1)
199
    elif edisp == "edispkernelmap":
200
        edisp = EDispKernelMap.from_diagonal_response(
201
            energy_axis=e_reco, energy_axis_true=e_true
202
        )
203
        data = exposure.get_spectrum(geom.center_skydir).data
204
        edisp.exposure_map.data = np.repeat(data, 2, axis=-1)
205
    else:
206
        edisp = None
207
208
    # define fit mask
209
    center = SkyCoord("0.2 deg", "0.1 deg", frame="galactic")
210
    circle = CircleSkyRegion(center=center, radius=1 * u.deg)
211
    mask_fit = geom.region_mask([circle])
212
213
    models = FoVBackgroundModel(dataset_name=name)
214
215
    return MapDataset(
216
        models=models,
217
        exposure=exposure,
218
        background=background,
219
        psf=psf,
220
        edisp=edisp,
221
        mask_fit=mask_fit,
222
        name=name,
223
        **kwargs,
224
    )
225
226
227
@requires_data()
228
def test_map_dataset_str(sky_model, geom, geom_etrue):
229
    dataset = get_map_dataset(geom, geom_etrue)
230
231
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
232
    dataset.models = [sky_model, bkg_model]
233
234
    dataset.counts = dataset.npred()
235
    dataset.mask_safe = dataset.mask_fit
236
    assert "MapDataset" in str(dataset)
237
    assert "(frozen)" in str(dataset)
238
    assert "background" in str(dataset)
239
240
    dataset.mask_safe = None
241
    assert "MapDataset" in str(dataset)
242
243
244
def test_map_dataset_str_empty():
245
    dataset = MapDataset()
246
    assert "MapDataset" in str(dataset)
247
248
249
@requires_data()
250
def test_fake(sky_model, geom, geom_etrue):
251
    """Test the fake dataset"""
252
    dataset = get_map_dataset(geom, geom_etrue)
253
254
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
255
    dataset.models = [sky_model, bkg_model]
256
257
    npred = dataset.npred()
258
    assert np.all(npred.data >= 0)  # npred must be positive
259
    dataset.counts = npred
260
    real_dataset = dataset.copy()
261
    dataset.fake(314)
262
263
    assert real_dataset.counts.data.shape == dataset.counts.data.shape
264
    assert_allclose(real_dataset.counts.data.sum(), 9525.299054, rtol=1e-5)
265
    assert_allclose(dataset.counts.data.sum(), 9711)
266
267
268
@requires_data()
269
def test_different_exposure_unit(sky_model, geom):
270
    energy_range_true = np.logspace(2, 4, 3)
271
    axis = MapAxis.from_edges(
272
        energy_range_true, name="energy_true", unit="GeV", interp="log"
273
    )
274
    geom_gev = geom.to_image().to_cube([axis])
275
    dataset = get_map_dataset(geom, geom_gev, edisp="None")
276
277
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
278
    dataset.models = [sky_model, bkg_model]
279
280
    npred = dataset.npred()
281
282
    assert_allclose(npred.data[0, 50, 50], 6.086019, rtol=1e-2)
283
284
285
@pytest.mark.parametrize(("edisp_mode"), ["edispmap", "edispkernelmap"])
286
@requires_data()
287
def test_to_spectrum_dataset(sky_model, geom, geom_etrue, edisp_mode):
288
289
    dataset_ref = get_map_dataset(geom, geom_etrue, edisp=edisp_mode)
290
291
    bkg_model = FoVBackgroundModel(dataset_name=dataset_ref.name)
292
    dataset_ref.models = [sky_model, bkg_model]
293
294
    dataset_ref.counts = dataset_ref.npred_background() * 0.0
295
    dataset_ref.counts.data[1, 50, 50] = 1
296
    dataset_ref.counts.data[1, 60, 50] = 1
297
298
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
299
    dataset_ref.gti = gti
300
    on_region = CircleSkyRegion(center=geom.center_skydir, radius=0.05 * u.deg)
301
    spectrum_dataset = dataset_ref.to_spectrum_dataset(on_region)
302
    spectrum_dataset_corrected = dataset_ref.to_spectrum_dataset(
303
        on_region, containment_correction=True
304
    )
305
    mask = np.ones_like(dataset_ref.counts, dtype="bool")
306
    mask[1, 40:60, 40:60] = False
307
    dataset_ref.mask_safe = Map.from_geom(dataset_ref.counts.geom, data=mask)
308
    spectrum_dataset_mask = dataset_ref.to_spectrum_dataset(on_region)
309
310
    assert np.sum(spectrum_dataset.counts.data) == 1
311
    assert spectrum_dataset.data_shape == (2, 1, 1)
312
    assert spectrum_dataset.background.geom.axes[0].nbin == 2
313
    assert spectrum_dataset.exposure.geom.axes[0].nbin == 3
314
    assert spectrum_dataset.exposure.unit == "m2s"
315
316
    energy_axis = geom.axes["energy"]
317
    assert (
318
        spectrum_dataset.edisp.get_edisp_kernel(energy_axis=energy_axis)
319
        .axes["energy"]
320
        .nbin
321
        == 2
322
    )
323
    assert (
324
        spectrum_dataset.edisp.get_edisp_kernel(energy_axis=energy_axis)
325
        .axes["energy_true"]
326
        .nbin
327
        == 3
328
    )
329
330
    assert_allclose(spectrum_dataset.edisp.exposure_map.data[1], 3.070917e09, rtol=1e-5)
331
    assert np.sum(spectrum_dataset_mask.counts.data) == 0
332
    assert spectrum_dataset_mask.data_shape == (2, 1, 1)
333
    assert spectrum_dataset_corrected.exposure.unit == "m2s"
334
335
    assert_allclose(spectrum_dataset.exposure.data[1], 3.070884e09, rtol=1e-5)
336
    assert_allclose(spectrum_dataset_corrected.exposure.data[1], 2.05201e09, rtol=1e-5)
337
338
339
@requires_data()
340
def test_energy_range(sky_model, geom1, geom_etrue):
341
    sky_coord1 = SkyCoord(266.5, -29.3, unit="deg")
342
    region1 = CircleSkyRegion(sky_coord1, 0.5 * u.deg)
343
    mask1 = geom1.region_mask([region1]) & geom1.energy_mask(1 * u.TeV, 7 * u.TeV)
344
    sky_coord2 = SkyCoord(266.5, -28.7, unit="deg")
345
    region2 = CircleSkyRegion(sky_coord2, 0.5 * u.deg)
346
    mask2 = geom1.region_mask([region2]) & geom1.energy_mask(2 * u.TeV, 8 * u.TeV)
347
    mask3 = geom1.energy_mask(3 * u.TeV, 6 * u.TeV)
348
349
    mask_safe = Map.from_geom(geom1, data=(mask1 | mask2 | mask3).data)
350
    dataset = get_map_dataset(geom1, geom_etrue, edisp=None, mask_safe=mask_safe)
351
    energy = geom1.axes["energy"].edges.value
352
353
    e_min, e_max = dataset.energy_range_safe
354
    assert_allclose(e_min.get_by_coord((265, -28, 0)), energy[15])
355
    assert_allclose(e_max.get_by_coord((265, -28, 5)), energy[17])
356
    assert_allclose(e_min.get_by_coord((sky_coord1.ra, sky_coord1.dec, 6)), energy[10])
357
    assert_allclose(e_max.get_by_coord((sky_coord1.ra, sky_coord1.dec, 1)), energy[18])
358
    assert_allclose(e_min.get_by_coord((sky_coord2.ra, sky_coord2.dec, 2)), energy[14])
359
    assert_allclose(e_max.get_by_coord((sky_coord2.ra, sky_coord2.dec, 7)), energy[19])
360
    assert_allclose(e_min.get_by_coord((266.5, -29, 8)), energy[10])
361
    assert_allclose(e_max.get_by_coord((266.5, -29, 3)), energy[19])
362
363
    e_min, e_max = dataset.energy_range_fit
364
    assert_allclose(e_min.get_by_coord((265, -28, 0)), np.nan)
365
    assert_allclose(e_max.get_by_coord((265, -28, 5)), np.nan)
366
    assert_allclose(e_min.get_by_coord((266, -29, 4)), energy[0])
367
    assert_allclose(e_max.get_by_coord((266, -29, 9)), energy[20])
368
369
    e_min, e_max = dataset.energy_range
370
    assert_allclose(e_min.get_by_coord((266, -29, 4)), energy[15])
371
    assert_allclose(e_max.get_by_coord((266, -29, 9)), energy[17])
372
373
    mask_zeros = Map.from_geom(geom1, data=np.zeros_like(mask_safe))
374
    e_min, e_max = dataset._energy_range(mask_zeros)
375
    assert_allclose(e_min.get_by_coord((266.5, -29, 8)), np.nan)
376
    assert_allclose(e_max.get_by_coord((266.5, -29, 3)), np.nan)
377
378
    e_min, e_max = dataset._energy_range()
379
    assert_allclose(e_min.get_by_coord((265, -28, 0)), energy[0])
380
    assert_allclose(e_max.get_by_coord((265, -28, 5)), energy[20])
381
382
383
@requires_data()
384
def test_info_dict(sky_model, geom, geom_etrue):
385
    dataset = get_map_dataset(geom, geom_etrue)
386
387
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
388
    dataset.models = [sky_model, bkg_model]
389
390
    dataset.counts = dataset.npred()
391
    info_dict = dataset.info_dict()
392
393
    assert_allclose(info_dict["counts"], 9526, rtol=1e-3)
394
    assert_allclose(info_dict["background"], 4000.0005, rtol=1e-3)
395
    assert_allclose(info_dict["npred_background"], 4000.0, rtol=1e-3)
396
    assert_allclose(info_dict["excess"], 5525.756, rtol=1e-3)
397
    assert_allclose(info_dict["exposure_min"].value, 8.32e8, rtol=1e-3)
398
    assert_allclose(info_dict["exposure_max"].value, 1.105e10, rtol=1e-3)
399
    assert info_dict["exposure_max"].unit == "m2 s"
400
    assert info_dict["name"] == "test"
401
402
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
403
    dataset.gti = gti
404
    info_dict = dataset.info_dict()
405
    assert_allclose(info_dict["counts"], 9526, rtol=1e-3)
406
    assert_allclose(info_dict["background"], 4000.0005, rtol=1e-3)
407
    assert_allclose(info_dict["npred_background"], 4000.0, rtol=1e-3)
408
    assert_allclose(info_dict["sqrt_ts"], 74.024180, rtol=1e-3)
409
    assert_allclose(info_dict["excess"], 5525.756, rtol=1e-3)
410
    assert_allclose(info_dict["ontime"].value, 3600)
411
412
    assert info_dict["name"] == "test"
413
414
    # try to dump as json
415
    result = json.dumps(info_dict, cls=JsonQuantityEncoder)
416
    assert "counts" in result
417
418
419
def get_fermi_3fhl_gc_dataset():
420
    counts = Map.read("$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-counts-cube.fits.gz")
421
    background = Map.read(
422
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-background-cube.fits.gz"
423
    )
424
    bkg_model = FoVBackgroundModel(dataset_name="fermi-3fhl-gc")
425
426
    exposure = Map.read(
427
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc-exposure-cube.fits.gz"
428
    )
429
    return MapDataset(
430
        counts=counts,
431
        background=background,
432
        models=[bkg_model],
433
        exposure=exposure,
434
        name="fermi-3fhl-gc",
435
    )
436
437
438
@requires_data()
439
def test_resample_energy_3fhl():
440
    dataset = get_fermi_3fhl_gc_dataset()
441
442
    new_axis = MapAxis.from_edges([10, 35, 100] * u.GeV, interp="log", name="energy")
443
    grouped = dataset.resample_energy_axis(energy_axis=new_axis)
444
445
    assert grouped.counts.data.shape == (2, 200, 400)
446
    assert grouped.counts.data[0].sum() == 28581
447
    assert_allclose(
448
        grouped.npred_background().data.sum(axis=(1, 2)),
449
        [25074.366386, 2194.298612],
450
        rtol=1e-5,
451
    )
452
    assert_allclose(grouped.exposure.data, dataset.exposure.data, rtol=1e-5)
453
454
    axis = grouped.counts.geom.axes[0]
455
    npred = dataset.npred()
456
    npred_grouped = grouped.npred()
457
    assert_allclose(npred.resample_axis(axis=axis).data.sum(), npred_grouped.data.sum())
458
459
460
@requires_data()
461
def test_to_image_3fhl():
462
    dataset = get_fermi_3fhl_gc_dataset()
463
464
    dataset_im = dataset.to_image()
465
466
    assert dataset_im.counts.data.sum() == dataset.counts.data.sum()
467
    assert_allclose(dataset_im.npred_background().data.sum(), 28548.625, rtol=1e-5)
468
    assert_allclose(dataset_im.exposure.data, dataset.exposure.data, rtol=1e-5)
469
470
    npred = dataset.npred()
471
    npred_im = dataset_im.npred()
472
    assert_allclose(npred.data.sum(), npred_im.data.sum())
473
474
475
def test_to_image_mask_safe():
476
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=2)
477
    geom = WcsGeom.create(
478
        skydir=(0, 0), binsz=0.5, width=(1, 1), frame="icrs", axes=[axis]
479
    )
480
    dataset = MapDataset.create(geom)
481
482
    # Check map_safe handling
483
    data = np.array([[[False, True], [True, True]], [[False, False], [True, True]]])
484
    dataset.mask_safe = WcsNDMap.from_geom(geom=geom, data=data)
485
486
    dataset_im = dataset.to_image()
487
    assert dataset_im.mask_safe.data.dtype == bool
488
489
    desired = np.array([[False, True], [True, True]])
490
    assert (dataset_im.mask_safe.data == desired).all()
491
492
    # Check that missing entries in the dataset do not break
493
    dataset_copy = dataset.copy()
494
    dataset_copy.exposure = None
495
    dataset_im = dataset_copy.to_image()
496
    assert dataset_im.exposure is None
497
498
    dataset_copy = dataset.copy()
499
    dataset_copy.counts = None
500
    dataset_im = dataset_copy.to_image()
501
    assert dataset_im.counts is None
502
503
504
@requires_data()
505
def test_downsample():
506
    dataset = get_fermi_3fhl_gc_dataset()
507
508
    downsampled = dataset.downsample(2)
509
510
    assert downsampled.counts.data.shape == (11, 100, 200)
511
    assert downsampled.counts.data.sum() == dataset.counts.data.sum()
512
    assert_allclose(
513
        downsampled.npred_background().data.sum(axis=(1, 2)),
514
        dataset.npred_background().data.sum(axis=(1, 2)),
515
        rtol=1e-5,
516
    )
517
    assert_allclose(downsampled.exposure.data[5, 50, 100], 3.318082e11, rtol=1e-5)
518
519
    with pytest.raises(ValueError):
520
        dataset.downsample(2, axis_name="energy")
521
522
523
def test_downsample_energy(geom, geom_etrue):
524
    # This checks that downsample and resample_energy_axis give identical results
525
    counts = Map.from_geom(geom, dtype="int")
526
    counts += 1
527
    mask = Map.from_geom(geom, dtype="bool")
528
    mask.data[1:] = True
529
    counts += 1
530
    exposure = Map.from_geom(geom_etrue, unit="m2s")
531
    edisp = EDispKernelMap.from_gauss(geom.axes[0], geom_etrue.axes[0], 0.1, 0.0)
532
    dataset = MapDataset(
533
        counts=counts,
534
        exposure=exposure,
535
        mask_safe=mask,
536
        edisp=edisp,
537
    )
538
    dataset_downsampled = dataset.downsample(2, axis_name="energy")
539
    dataset_resampled = dataset.resample_energy_axis(geom.axes[0].downsample(2))
540
541
    assert dataset_downsampled.edisp.edisp_map.data.shape == (3, 1, 1, 2)
542
    assert_allclose(
543
        dataset_downsampled.edisp.edisp_map.data[:, :, 0, 0],
544
        dataset_resampled.edisp.edisp_map.data[:, :, 0, 0],
545
    )
546
547
548
@requires_data()
549
def test_map_dataset_fits_io(tmp_path, sky_model, geom, geom_etrue):
550
    dataset = get_map_dataset(geom, geom_etrue)
551
552
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
553
    dataset.models = [sky_model, bkg_model]
554
555
    dataset.counts = dataset.npred()
556
    dataset.mask_safe = dataset.mask_fit
557
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
558
    dataset.gti = gti
559
560
    hdulist = dataset.to_hdulist()
561
562
    actual = [hdu.name for hdu in hdulist]
563
564
    desired = [
565
        "PRIMARY",
566
        "COUNTS",
567
        "COUNTS_BANDS",
568
        "EXPOSURE",
569
        "EXPOSURE_BANDS",
570
        "BACKGROUND",
571
        "BACKGROUND_BANDS",
572
        "EDISP",
573
        "EDISP_BANDS",
574
        "EDISP_EXPOSURE",
575
        "EDISP_EXPOSURE_BANDS",
576
        "PSF",
577
        "PSF_BANDS",
578
        "PSF_EXPOSURE",
579
        "PSF_EXPOSURE_BANDS",
580
        "MASK_SAFE",
581
        "MASK_SAFE_BANDS",
582
        "MASK_FIT",
583
        "MASK_FIT_BANDS",
584
        "GTI",
585
    ]
586
587
    assert actual == desired
588
589
    dataset.write(tmp_path / "test.fits")
590
591
    dataset_new = MapDataset.read(tmp_path / "test.fits")
592
593
    assert dataset_new.mask.data.dtype == bool
594
595
    assert_allclose(dataset.counts.data, dataset_new.counts.data)
596
    assert_allclose(
597
        dataset.npred_background().data, dataset_new.npred_background().data
598
    )
599
    assert_allclose(dataset.edisp.edisp_map.data, dataset_new.edisp.edisp_map.data)
600
    assert_allclose(dataset.psf.psf_map.data, dataset_new.psf.psf_map.data)
601
    assert_allclose(dataset.exposure.data, dataset_new.exposure.data)
602
    assert_allclose(dataset.mask_fit.data, dataset_new.mask_fit.data)
603
    assert_allclose(dataset.mask_safe.data, dataset_new.mask_safe.data)
604
605
    assert dataset.counts.geom == dataset_new.counts.geom
606
    assert dataset.exposure.geom == dataset_new.exposure.geom
607
608
    assert_allclose(dataset.exposure.meta["livetime"], 1 * u.h)
609
    assert dataset.npred_background().geom == dataset_new.npred_background().geom
610
    assert dataset.edisp.edisp_map.geom == dataset_new.edisp.edisp_map.geom
611
612
    assert_allclose(
613
        dataset.gti.time_sum.to_value("s"), dataset_new.gti.time_sum.to_value("s")
614
    )
615
616
    # To test io of psf and edisp map
617
    stacked = MapDataset.create(geom)
618
    stacked.write(tmp_path / "test-2.fits", overwrite=True)
619
    stacked1 = MapDataset.read(tmp_path / "test-2.fits")
620
    assert stacked1.psf.psf_map is not None
621
    assert stacked1.psf.exposure_map is not None
622
    assert stacked1.edisp.edisp_map is not None
623
    assert stacked1.edisp.exposure_map is not None
624
    assert stacked.mask.data.dtype == bool
625
626
    assert_allclose(stacked1.psf.psf_map, stacked.psf.psf_map)
627
    assert_allclose(stacked1.edisp.edisp_map, stacked.edisp.edisp_map)
628
629
630
@requires_dependency("iminuit")
631
@requires_dependency("matplotlib")
632
@requires_data()
633
def test_map_fit(sky_model, geom, geom_etrue):
634
    dataset_1 = get_map_dataset(geom, geom_etrue, name="test-1")
635
    dataset_2 = get_map_dataset(geom, geom_etrue, name="test-2")
636
    datasets = Datasets([dataset_1, dataset_2])
637
638
    models = Models(datasets.models)
639
    models.insert(0, sky_model)
640
641
    models["test-1-bkg"].spectral_model.norm.value = 0.5
642
    models["test-model"].spatial_model.sigma.frozen = True
643
644
    datasets.models = models
645
    dataset_2.counts = dataset_2.npred()
646
    dataset_1.counts = dataset_1.npred()
647
648
    models["test-1-bkg"].spectral_model.norm.value = 0.49
649
    models["test-2-bkg"].spectral_model.norm.value = 0.99
650
651
    fit = Fit()
652
    result = fit.run(datasets=datasets)
653
654
    assert result.success
655
    assert "minuit" in repr(result)
656
657
    npred = dataset_1.npred().data.sum()
658
    assert_allclose(npred, 7525.790688, rtol=1e-3)
659
    assert_allclose(result.total_stat, 21625.845714, rtol=1e-3)
660
661
    pars = result.parameters
662
    assert_allclose(pars["lon_0"].value, 0.2, rtol=1e-2)
663
    assert_allclose(pars["lon_0"].error, 0.002244, rtol=1e-2)
664
665
    assert_allclose(pars["index"].value, 3, rtol=1e-2)
666
    assert_allclose(pars["index"].error, 0.0242, rtol=1e-2)
667
668
    assert_allclose(pars["amplitude"].value, 1e-11, rtol=1e-2)
669
    assert_allclose(pars["amplitude"].error, 4.216e-13, rtol=1e-2)
670
671
    # background norm 1
672
    assert_allclose(pars[8].value, 0.5, rtol=1e-2)
673
    assert_allclose(pars[8].error, 0.015811, rtol=1e-2)
674
675
    # background norm 2
676
    assert_allclose(pars[11].value, 1, rtol=1e-2)
677
    assert_allclose(pars[11].error, 0.02147, rtol=1e-2)
678
679
    # test mask_safe evaluation
680
    dataset_1.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
681
    dataset_2.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
682
683
    stat = datasets.stat_sum()
684
    assert_allclose(stat, 14823.772744, rtol=1e-5)
685
686
    region = sky_model.spatial_model.to_region()
687
688
    initial_counts = dataset_1.counts.copy()
689
    with mpl_plot_check():
690
        dataset_1.plot_residuals(kwargs_spectral=dict(region=region))
691
692
    # check dataset has not changed
693
    assert initial_counts == dataset_1.counts
694
695
    # test model evaluation outside image
696
    dataset_1.models[0].spatial_model.lon_0.value = 150
697
    dataset_1.npred()
698
    assert not dataset_1.evaluators["test-model"].contributes
699
700
701
@requires_dependency("iminuit")
702
@requires_data()
703
def test_map_fit_one_energy_bin(sky_model, geom_image):
704
    energy_axis = geom_image.axes["energy"]
705
    geom_etrue = geom_image.to_image().to_cube([energy_axis.copy(name="energy_true")])
706
707
    dataset = get_map_dataset(geom_image, geom_etrue)
708
709
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
710
    dataset.models = [sky_model, bkg_model]
711
712
    sky_model.spectral_model.index.value = 3.0
713
    sky_model.spectral_model.index.frozen = True
714
    dataset.models[f"{dataset.name}-bkg"].spectral_model.norm.value = 0.5
715
716
    dataset.counts = dataset.npred()
717
718
    # Move a bit away from the best-fit point, to make sure the optimiser runs
719
    sky_model.parameters["sigma"].value = 0.21
720
    dataset.models[f"{dataset.name}-bkg"].parameters["norm"].frozen = True
721
722
    fit = Fit()
723
    result = fit.run(datasets=[dataset])
724
725
    assert result.success
726
727
    npred = dataset.npred().data.sum()
728
    assert_allclose(npred, 16538.124036, rtol=1e-3)
729
    assert_allclose(result.total_stat, -34844.125047, rtol=1e-3)
730
731
    pars = result.parameters
732
733
    assert_allclose(pars["lon_0"].value, 0.2, rtol=1e-2)
734
    assert_allclose(pars["lon_0"].error, 0.001689, rtol=1e-2)
735
736
    assert_allclose(pars["sigma"].value, 0.2, rtol=1e-2)
737
    assert_allclose(pars["sigma"].error, 0.00092, rtol=1e-2)
738
739
    assert_allclose(pars["amplitude"].value, 1e-11, rtol=1e-2)
740
    assert_allclose(pars["amplitude"].error, 8.127593e-14, rtol=1e-2)
741
742
743
def test_create():
744
    # tests empty datasets created
745
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
746
    e_reco = MapAxis.from_edges(
747
        np.logspace(-1.0, 1.0, 3), name="energy", unit=u.TeV, interp="log"
748
    )
749
    e_true = MapAxis.from_edges(
750
        np.logspace(-1.0, 1.0, 4), name="energy_true", unit=u.TeV, interp="log"
751
    )
752
    geom = WcsGeom.create(binsz=0.02, width=(2, 2), axes=[e_reco])
753
    empty_dataset = MapDataset.create(
754
        geom=geom, energy_axis_true=e_true, rad_axis=rad_axis
755
    )
756
757
    assert empty_dataset.counts.data.shape == (2, 100, 100)
758
759
    assert empty_dataset.exposure.data.shape == (3, 100, 100)
760
761
    assert empty_dataset.psf.psf_map.data.shape == (3, 50, 10, 10)
762
    assert empty_dataset.psf.exposure_map.data.shape == (3, 1, 10, 10)
763
764
    assert isinstance(empty_dataset.edisp, EDispKernelMap)
765
    assert empty_dataset.edisp.edisp_map.data.shape == (3, 2, 10, 10)
766
    assert empty_dataset.edisp.exposure_map.data.shape == (3, 1, 10, 10)
767
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 300)
768
769
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
770
771
772
def test_create_with_migra(tmp_path):
773
    # tests empty datasets created
774
    migra_axis = MapAxis(nodes=np.linspace(0.0, 3.0, 51), unit="", name="migra")
775
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
776
    e_reco = MapAxis.from_edges(
777
        np.logspace(-1.0, 1.0, 3), name="energy", unit=u.TeV, interp="log"
778
    )
779
    e_true = MapAxis.from_edges(
780
        np.logspace(-1.0, 1.0, 4), name="energy_true", unit=u.TeV, interp="log"
781
    )
782
    geom = WcsGeom.create(binsz=0.02, width=(2, 2), axes=[e_reco])
783
    empty_dataset = MapDataset.create(
784
        geom=geom, energy_axis_true=e_true, migra_axis=migra_axis, rad_axis=rad_axis
785
    )
786
787
    empty_dataset.write(tmp_path / "test.fits")
788
789
    dataset_new = MapDataset.read(tmp_path / "test.fits")
790
791
    assert isinstance(empty_dataset.edisp, EDispMap)
792
    assert empty_dataset.edisp.edisp_map.data.shape == (3, 50, 10, 10)
793
    assert empty_dataset.edisp.exposure_map.data.shape == (3, 1, 10, 10)
794
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 5000)
795
796
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
797
798
    assert isinstance(dataset_new.edisp, EDispMap)
799
    assert dataset_new.edisp.edisp_map.data.shape == (3, 50, 10, 10)
800
801
802
def test_stack(sky_model):
803
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=3)
804
    geom = WcsGeom.create(
805
        skydir=(266.40498829, -28.93617776),
806
        binsz=0.05,
807
        width=(2, 2),
808
        frame="icrs",
809
        axes=[axis],
810
    )
811
    axis_etrue = MapAxis.from_energy_bounds(
812
        "0.1 TeV", "10 TeV", nbin=5, name="energy_true"
813
    )
814
    geom_etrue = WcsGeom.create(
815
        skydir=(266.40498829, -28.93617776),
816
        binsz=0.05,
817
        width=(2, 2),
818
        frame="icrs",
819
        axes=[axis_etrue],
820
    )
821
822
    edisp = EDispKernelMap.from_diagonal_response(
823
        energy_axis=axis, energy_axis_true=axis_etrue, geom=geom
824
    )
825
    edisp.exposure_map.quantity = (
826
        1e0 * u.m ** 2 * u.s * np.ones(edisp.exposure_map.data.shape)
827
    )
828
829
    bkg1 = Map.from_geom(geom)
830
    bkg1.data += 0.2
831
832
    cnt1 = Map.from_geom(geom)
833
    cnt1.data = 1.0 * np.ones(cnt1.data.shape)
834
835
    exp1 = Map.from_geom(geom_etrue)
836
    exp1.quantity = 1e7 * u.m ** 2 * u.s * np.ones(exp1.data.shape)
837
838
    mask1 = Map.from_geom(geom)
839
    mask1.data = np.ones(mask1.data.shape, dtype=bool)
840
    mask1.data[0][:][5:10] = False
841
    dataset1 = MapDataset(
842
        counts=cnt1,
843
        background=bkg1,
844
        exposure=exp1,
845
        mask_safe=mask1,
846
        name="dataset-1",
847
        edisp=edisp,
848
        meta_table=Table({"OBS_ID": [0]}),
849
    )
850
851
    bkg2 = Map.from_geom(geom)
852
    bkg2.data = 0.1 * np.ones(bkg2.data.shape)
853
854
    cnt2 = Map.from_geom(geom)
855
    cnt2.data = 1.0 * np.ones(cnt2.data.shape)
856
857
    exp2 = Map.from_geom(geom_etrue)
858
    exp2.quantity = 1e7 * u.m ** 2 * u.s * np.ones(exp2.data.shape)
859
860
    mask2 = Map.from_geom(geom)
861
    mask2.data = np.ones(mask2.data.shape, dtype=bool)
862
    mask2.data[0][:][5:10] = False
863
    mask2.data[1][:][10:15] = False
864
865
    dataset2 = MapDataset(
866
        counts=cnt2,
867
        background=bkg2,
868
        exposure=exp2,
869
        mask_safe=mask2,
870
        name="dataset-2",
871
        edisp=edisp,
872
        meta_table=Table({"OBS_ID": [1]}),
873
    )
874
875
    background_model2 = FoVBackgroundModel(dataset_name="dataset-2")
876
    background_model1 = FoVBackgroundModel(dataset_name="dataset-1")
877
878
    dataset1.models = [background_model1, sky_model]
879
    dataset2.models = [background_model2, sky_model]
880
881
    stacked = MapDataset.from_geoms(**dataset1.geoms)
882
    stacked.stack(dataset1)
883
    stacked.stack(dataset2)
884
885
    stacked.models = [sky_model]
886
    npred_b = stacked.npred()
887
888
    assert_allclose(npred_b.data.sum(), 1459.985035, 1e-5)
889
    assert_allclose(stacked.npred_background().data.sum(), 1360.00, 1e-5)
890
    assert_allclose(stacked.counts.data.sum(), 9000, 1e-5)
891
    assert_allclose(stacked.mask_safe.data.sum(), 4600)
892
    assert_allclose(stacked.exposure.data.sum(), 1.6e11)
893
894
    assert_allclose(stacked.meta_table["OBS_ID"][0], [0, 1])
895
896
897
@requires_data()
898
def test_npred(sky_model, geom, geom_etrue):
899
    dataset = get_map_dataset(geom, geom_etrue)
900
901
    pwl = PowerLawSpectralModel()
902
    gauss = GaussianSpatialModel(
903
        lon_0="0.0 deg", lat_0="0.0 deg", sigma="0.5 deg", frame="galactic"
904
    )
905
    model1 = SkyModel(pwl, gauss, name="m1")
906
907
    bkg = FoVBackgroundModel(dataset_name=dataset.name)
908
    dataset.models = [bkg, sky_model, model1]
909
910
    assert_allclose(
911
        dataset.npred_signal(model_name=model1.name).data.sum(), 150.7487, rtol=1e-3
912
    )
913
    assert dataset._background_cached is None
914
    assert_allclose(dataset.npred_background().data.sum(), 4000., rtol=1e-3)
915
    assert_allclose(dataset._background_cached.data.sum(), 4000., rtol=1e-3)
916
917
    assert_allclose(dataset.npred().data.sum(), 9676.047906, rtol=1e-3)
918
    assert_allclose(dataset.npred_signal().data.sum(), 5676.04790, rtol=1e-3)
919
920
    bkg.spectral_model.norm.value = 1.1
921
    assert_allclose(dataset.npred_background().data.sum(), 4400., rtol=1e-3)
922
    assert_allclose(dataset._background_cached.data.sum(), 4400., rtol=1e-3)
923
924
925
    with pytest.raises(
926
        KeyError,
927
        match="m2",
928
    ):
929
        dataset.npred_signal(model_name="m2")
930
931
932
933
def test_stack_npred():
934
    pwl = PowerLawSpectralModel()
935
    gauss = GaussianSpatialModel(sigma="0.2 deg")
936
    model = SkyModel(pwl, gauss)
937
938
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=5)
939
    axis_etrue = MapAxis.from_energy_bounds(
940
        "0.1 TeV", "10 TeV", nbin=11, name="energy_true"
941
    )
942
943
    geom = WcsGeom.create(
944
        skydir=(0, 0),
945
        binsz=0.05,
946
        width=(2, 2),
947
        frame="icrs",
948
        axes=[axis],
949
    )
950
951
    dataset_1 = MapDataset.create(
952
        geom,
953
        energy_axis_true=axis_etrue,
954
        name="dataset-1",
955
        gti=GTI.create("0 min", "30 min"),
956
    )
957
    dataset_1.psf = None
958
    dataset_1.exposure.data += 1
959
    dataset_1.mask_safe = geom.energy_mask(energy_min=1 * u.TeV)
960
    dataset_1.background.data += 1
961
962
    bkg_model_1 = FoVBackgroundModel(dataset_name=dataset_1.name)
963
    dataset_1.models = [model, bkg_model_1]
964
965
    dataset_2 = MapDataset.create(
966
        geom,
967
        energy_axis_true=axis_etrue,
968
        name="dataset-2",
969
        gti=GTI.create("30 min", "60 min"),
970
    )
971
    dataset_2.psf = None
972
    dataset_2.exposure.data += 1
973
    dataset_2.mask_safe = geom.energy_mask(energy_min=0.2 * u.TeV)
974
    dataset_2.background.data += 1
975
976
    bkg_model_2 = FoVBackgroundModel(dataset_name=dataset_2.name)
977
    dataset_2.models = [model, bkg_model_2]
978
979
    npred_1 = dataset_1.npred()
980
    npred_1.data[~dataset_1.mask_safe.data] = 0
981
    npred_2 = dataset_2.npred()
982
    npred_2.data[~dataset_2.mask_safe.data] = 0
983
984
    stacked_npred = Map.from_geom(geom)
985
    stacked_npred.stack(npred_1)
986
    stacked_npred.stack(npred_2)
987
988
    stacked = MapDataset.create(geom, energy_axis_true=axis_etrue, name="stacked")
989
    stacked.stack(dataset_1)
990
    stacked.stack(dataset_2)
991
992
    npred_stacked = stacked.npred()
993
994
    assert_allclose(npred_stacked.data, stacked_npred.data)
995
996
997
def to_cube(image):
998
    # introduce a fake energy axis for now
999
    axis = MapAxis.from_edges([1, 10] * u.TeV, name="energy")
1000
    geom = image.geom.to_cube([axis])
1001
    return WcsNDMap.from_geom(geom=geom, data=image.data)
1002
1003
1004
@pytest.fixture
1005
def images():
1006
    """Load some `counts`, `counts_off`, `acceptance_on`, `acceptance_off" images"""
1007
    filename = "$GAMMAPY_DATA/tests/unbundled/hess/survey/hess_survey_snippet.fits.gz"
1008
    return {
1009
        "counts": to_cube(WcsNDMap.read(filename, hdu="ON")),
1010
        "counts_off": to_cube(WcsNDMap.read(filename, hdu="OFF")),
1011
        "acceptance": to_cube(WcsNDMap.read(filename, hdu="ONEXPOSURE")),
1012
        "acceptance_off": to_cube(WcsNDMap.read(filename, hdu="OFFEXPOSURE")),
1013
        "exposure": to_cube(WcsNDMap.read(filename, hdu="EXPGAMMAMAP")),
1014
        "background": to_cube(WcsNDMap.read(filename, hdu="BACKGROUND")),
1015
    }
1016
1017
1018
def test_npred_psf_after_edisp():
1019
    energy_axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
1020
    energy_axis_true = MapAxis.from_energy_bounds(
1021
        "0.8 TeV", "15 TeV", nbin=6, name="energy_true"
1022
    )
1023
1024
    geom = WcsGeom.create(width=4 * u.deg, binsz=0.02, axes=[energy_axis])
1025
    dataset = MapDataset.create(geom=geom, energy_axis_true=energy_axis_true)
1026
    dataset.background.data += 1
1027
    dataset.exposure.data += 1e12
1028
    dataset.mask_safe.data += True
1029
    dataset.psf = PSFMap.from_gauss(
1030
        energy_axis_true=energy_axis_true, sigma=0.2 * u.deg
1031
    )
1032
1033
    model = SkyModel(
1034
        spectral_model=PowerLawSpectralModel(),
1035
        spatial_model=PointSpatialModel(),
1036
        name="test-model",
1037
    )
1038
1039
    model.apply_irf["psf_after_edisp"] = True
1040
1041
    bkg_model = FoVBackgroundModel(dataset_name=dataset.name)
1042
    dataset.models = [bkg_model, model]
1043
1044
    npred = dataset.npred()
1045
    assert_allclose(npred.data.sum(), 129553.858658)
1046
1047
1048
def get_map_dataset_onoff(images, **kwargs):
1049
    """Returns a MapDatasetOnOff"""
1050
    mask_geom = images["counts"].geom
1051
    mask_data = np.ones(images["counts"].data.shape, dtype=bool)
1052
    mask_safe = Map.from_geom(mask_geom, data=mask_data)
1053
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1054
    energy_axis = mask_geom.axes["energy"]
1055
    energy_axis_true = energy_axis.copy(name="energy_true")
1056
1057
    psf = PSFMap.from_gauss(
1058
        energy_axis_true=energy_axis_true, sigma=0.2 * u.deg, geom=mask_geom.to_image()
1059
    )
1060
1061
    edisp = EDispKernelMap.from_diagonal_response(
1062
        energy_axis=energy_axis, energy_axis_true=energy_axis_true, geom=mask_geom
1063
    )
1064
1065
    return MapDatasetOnOff(
1066
        counts=images["counts"],
1067
        counts_off=images["counts_off"],
1068
        acceptance=images["acceptance"],
1069
        acceptance_off=images["acceptance_off"],
1070
        exposure=images["exposure"],
1071
        mask_safe=mask_safe,
1072
        psf=psf,
1073
        edisp=edisp,
1074
        gti=gti,
1075
        **kwargs,
1076
    )
1077
1078
1079
@requires_data()
1080
def test_map_dataset_on_off_fits_io(images, tmp_path):
1081
    dataset = get_map_dataset_onoff(images)
1082
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1083
    dataset.gti = gti
1084
1085
    hdulist = dataset.to_hdulist()
1086
    actual = [hdu.name for hdu in hdulist]
1087
1088
    desired = [
1089
        "PRIMARY",
1090
        "COUNTS",
1091
        "COUNTS_BANDS",
1092
        "EXPOSURE",
1093
        "EXPOSURE_BANDS",
1094
        "EDISP",
1095
        "EDISP_BANDS",
1096
        "EDISP_EXPOSURE",
1097
        "EDISP_EXPOSURE_BANDS",
1098
        "PSF",
1099
        "PSF_BANDS",
1100
        "PSF_EXPOSURE",
1101
        "PSF_EXPOSURE_BANDS",
1102
        "MASK_SAFE",
1103
        "MASK_SAFE_BANDS",
1104
        "GTI",
1105
        "COUNTS_OFF",
1106
        "COUNTS_OFF_BANDS",
1107
        "ACCEPTANCE",
1108
        "ACCEPTANCE_BANDS",
1109
        "ACCEPTANCE_OFF",
1110
        "ACCEPTANCE_OFF_BANDS",
1111
    ]
1112
1113
    assert actual == desired
1114
1115
    dataset.write(tmp_path / "test.fits")
1116
1117
    dataset_new = MapDatasetOnOff.read(tmp_path / "test.fits")
1118
    assert dataset_new.mask.data.dtype == bool
1119
1120
    assert_allclose(dataset.counts.data, dataset_new.counts.data)
1121
    assert_allclose(dataset.counts_off.data, dataset_new.counts_off.data)
1122
    assert_allclose(dataset.acceptance.data, dataset_new.acceptance.data)
1123
    assert_allclose(dataset.acceptance_off.data, dataset_new.acceptance_off.data)
1124
    assert_allclose(dataset.exposure.data, dataset_new.exposure.data)
1125
    assert_allclose(dataset.mask_safe, dataset_new.mask_safe)
1126
1127
    assert np.all(dataset.mask_safe.data == dataset_new.mask_safe.data) == True
1128
    assert dataset.mask_safe.geom == dataset_new.mask_safe.geom
1129
    assert dataset.counts.geom == dataset_new.counts.geom
1130
    assert dataset.exposure.geom == dataset_new.exposure.geom
1131
1132
    assert_allclose(
1133
        dataset.gti.time_sum.to_value("s"), dataset_new.gti.time_sum.to_value("s")
1134
    )
1135
1136
    assert dataset.psf.psf_map == dataset_new.psf.psf_map
1137
    assert dataset.psf.exposure_map == dataset_new.psf.exposure_map
1138
    assert dataset.edisp.edisp_map == dataset_new.edisp.edisp_map
1139
    assert dataset.edisp.exposure_map == dataset_new.edisp.exposure_map
1140
1141
1142
def test_create_onoff(geom):
1143
    # tests empty datasets created
1144
1145
    migra_axis = MapAxis(nodes=np.linspace(0.0, 3.0, 51), unit="", name="migra")
1146
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
1147
    energy_axis = geom.axes["energy"].copy(name="energy_true")
1148
1149
    empty_dataset = MapDatasetOnOff.create(geom, energy_axis, migra_axis, rad_axis)
1150
1151
    assert_allclose(empty_dataset.counts.data.sum(), 0.0)
1152
    assert_allclose(empty_dataset.counts_off.data.sum(), 0.0)
1153
    assert_allclose(empty_dataset.acceptance.data.sum(), 0.0)
1154
    assert_allclose(empty_dataset.acceptance_off.data.sum(), 0.0)
1155
1156
    assert empty_dataset.psf.psf_map.data.shape == (2, 50, 10, 10)
1157
    assert empty_dataset.psf.exposure_map.data.shape == (2, 1, 10, 10)
1158
1159
    assert empty_dataset.edisp.edisp_map.data.shape == (2, 50, 10, 10)
1160
    assert empty_dataset.edisp.exposure_map.data.shape == (2, 1, 10, 10)
1161
1162
    assert_allclose(empty_dataset.edisp.edisp_map.data.sum(), 3333.333333)
1163
1164
    assert_allclose(empty_dataset.gti.time_delta, 0.0 * u.s)
1165
1166
1167
@requires_data()
1168
def test_map_dataset_onoff_str(images):
1169
    dataset = get_map_dataset_onoff(images)
1170
    assert "MapDatasetOnOff" in str(dataset)
1171
1172
1173
@requires_data()
1174
def test_stack_onoff(images):
1175
    dataset = get_map_dataset_onoff(images)
1176
    stacked = dataset.copy()
1177
    stacked.stack(dataset)
1178
1179
    assert_allclose(stacked.counts.data.sum(), 2 * dataset.counts.data.sum())
1180
    assert_allclose(stacked.counts_off.data.sum(), 2 * dataset.counts_off.data.sum())
1181
    assert_allclose(
1182
        stacked.acceptance.data.sum(), dataset.data_shape[1] * dataset.data_shape[2]
1183
    )
1184
    assert_allclose(np.nansum(stacked.acceptance_off.data), 2.925793e08, rtol=1e-5)
1185
    assert_allclose(stacked.exposure.data, 2.0 * dataset.exposure.data)
1186
1187
1188
def test_dataset_cutout_aligned(geom):
1189
    dataset = MapDataset.create(geom)
1190
1191
    kwargs = {"position": geom.center_skydir, "width": 1 * u.deg}
1192
    geoms = {name: geom.cutout(**kwargs) for name, geom in dataset.geoms.items()}
1193
1194
    cutout = MapDataset.from_geoms(**geoms, name="cutout")
1195
1196
    assert dataset.counts.geom.is_aligned(cutout.counts.geom)
1197
    assert dataset.exposure.geom.is_aligned(cutout.exposure.geom)
1198
    assert dataset.edisp.edisp_map.geom.is_aligned(cutout.edisp.edisp_map.geom)
1199
    assert dataset.psf.psf_map.geom.is_aligned(cutout.psf.psf_map.geom)
1200
1201
1202
def test_stack_onoff_cutout(geom_image):
1203
    # Test stacking of cutouts
1204
    energy_axis_true = MapAxis.from_energy_bounds(
1205
        "1 TeV", "10 TeV", nbin=3, name="energy_true"
1206
    )
1207
1208
    dataset = MapDatasetOnOff.create(geom_image, energy_axis_true=energy_axis_true)
1209
    dataset.gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1210
1211
    kwargs = {"position": geom_image.center_skydir, "width": 1 * u.deg}
1212
    geoms = {name: geom.cutout(**kwargs) for name, geom in dataset.geoms.items()}
1213
1214
    dataset_cutout = MapDatasetOnOff.from_geoms(**geoms, name="cutout-dataset")
1215
    dataset_cutout.gti = GTI.create(
1216
        [0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00"
1217
    )
1218
    dataset_cutout.mask_safe.data += True
1219
    dataset_cutout.counts.data += 1
1220
    dataset_cutout.counts_off.data += 1
1221
    dataset_cutout.exposure.data += 1
1222
1223
    dataset.stack(dataset_cutout)
1224
1225
    assert_allclose(dataset.counts.data.sum(), 2500)
1226
    assert_allclose(dataset.counts_off.data.sum(), 2500)
1227
    assert_allclose(dataset.alpha.data.sum(), 0)
1228
    assert_allclose(dataset.exposure.data.sum(), 7500)
1229
    assert dataset_cutout.name == "cutout-dataset"
1230
1231
1232
def test_datasets_io_no_model(tmpdir):
1233
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=2)
1234
    geom = WcsGeom.create(npix=(5, 5), axes=[axis])
1235
    dataset_1 = MapDataset.create(geom, name="dataset_1")
1236
    dataset_2 = MapDataset.create(geom, name="dataset_2")
1237
1238
    datasets = Datasets([dataset_1, dataset_2])
1239
1240
    datasets.write(filename=tmpdir / "datasets.yaml")
1241
1242
    filename_1 = tmpdir / "dataset_1.fits"
1243
    assert filename_1.exists()
1244
1245
    filename_2 = tmpdir / "dataset_2.fits"
1246
    assert filename_2.exists()
1247
1248
1249
@requires_data()
1250
def test_map_dataset_on_off_to_spectrum_dataset(images):
1251
    dataset = get_map_dataset_onoff(images)
1252
1253
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1254
    dataset.gti = gti
1255
1256
    on_region = CircleSkyRegion(
1257
        center=dataset.counts.geom.center_skydir, radius=0.1 * u.deg
1258
    )
1259
1260
    spectrum_dataset = dataset.to_spectrum_dataset(on_region)
1261
1262
    assert spectrum_dataset.counts.data[0] == 8
1263
    assert spectrum_dataset.data_shape == (1, 1, 1)
1264
    assert spectrum_dataset.counts_off.data[0] == 33914
1265
    assert_allclose(spectrum_dataset.alpha.data[0], 0.0002143, atol=1e-7)
1266
1267
    excess_map = images["counts"] - images["background"]
1268
    excess_true = excess_map.get_spectrum(on_region, np.sum).data[0]
1269
1270
    excess = spectrum_dataset.excess.data[0]
1271
    assert_allclose(excess, excess_true, rtol=1e-3)
1272
1273
    assert spectrum_dataset.name != dataset.name
1274
1275
1276
@requires_data()
1277
def test_map_dataset_on_off_to_spectrum_dataset_weights():
1278
    e_reco = MapAxis.from_bounds(1, 10, nbin=3, unit="TeV", name="energy")
1279
1280
    geom = WcsGeom.create(
1281
        skydir=(0, 0), width=(2.5, 2.5), binsz=0.5, axes=[e_reco], frame="galactic"
1282
    )
1283
    counts = Map.from_geom(geom)
1284
    counts.data += 1
1285
    counts_off = Map.from_geom(geom)
1286
    counts_off.data += 2
1287
    acceptance = Map.from_geom(geom)
1288
    acceptance.data += 1
1289
    acceptance_off = Map.from_geom(geom)
1290
    acceptance_off.data += 4
1291
1292
    weights = Map.from_geom(geom, dtype="bool")
1293
    weights.data[1:, 2:4, 2] = True
1294
1295
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1296
1297
    dataset = MapDatasetOnOff(
1298
        counts=counts,
1299
        counts_off=counts_off,
1300
        acceptance=acceptance,
1301
        acceptance_off=acceptance_off,
1302
        mask_safe=weights,
1303
        gti=gti,
1304
    )
1305
1306
    on_region = CircleSkyRegion(
1307
        center=dataset.counts.geom.center_skydir, radius=1.5 * u.deg
1308
    )
1309
1310
    spectrum_dataset = dataset.to_spectrum_dataset(on_region)
1311
1312
    assert_allclose(spectrum_dataset.counts.data[:, 0, 0], [0, 2, 2])
1313
    assert_allclose(spectrum_dataset.counts_off.data[:, 0, 0], [0, 4, 4])
1314
    assert_allclose(spectrum_dataset.acceptance.data[:, 0, 0], [0, 0.08, 0.08])
1315
    assert_allclose(spectrum_dataset.acceptance_off.data[:, 0, 0], [0, 0.32, 0.32])
1316
    assert_allclose(spectrum_dataset.alpha.data[:, 0, 0], [0, 0.25, 0.25])
1317
1318
1319
@requires_data()
1320
def test_map_dataset_on_off_cutout(images):
1321
    dataset = get_map_dataset_onoff(images)
1322
    gti = GTI.create([0 * u.s], [1 * u.h], reference_time="2010-01-01T00:00:00")
1323
    dataset.gti = gti
1324
1325
    cutout_dataset = dataset.cutout(
1326
        images["counts"].geom.center_skydir, ["1 deg", "1 deg"]
1327
    )
1328
1329
    assert cutout_dataset.counts.data.shape == (1, 50, 50)
1330
    assert cutout_dataset.counts_off.data.shape == (1, 50, 50)
1331
    assert cutout_dataset.acceptance.data.shape == (1, 50, 50)
1332
    assert cutout_dataset.acceptance_off.data.shape == (1, 50, 50)
1333
    assert cutout_dataset.name != dataset.name
1334
1335
1336
def test_map_dataset_on_off_fake(geom):
1337
    rad_axis = MapAxis(nodes=np.linspace(0.0, 1.0, 51), unit="deg", name="rad")
1338
    energy_true_axis = geom.axes["energy"].copy(name="energy_true")
1339
1340
    empty_dataset = MapDatasetOnOff.create(geom, energy_true_axis, rad_axis=rad_axis)
1341
    empty_dataset.acceptance.data = 1.0
1342
    empty_dataset.acceptance_off.data = 10.0
1343
1344
    empty_dataset.acceptance_off.data[0, 50, 50] = 0
1345
    background_map = Map.from_geom(geom, data=1)
1346
    empty_dataset.fake(background_map, random_state=42)
1347
1348
    assert_allclose(empty_dataset.counts.data[0, 50, 50], 0)
1349
    assert_allclose(empty_dataset.counts.data.mean(), 0.99445, rtol=1e-3)
1350
    assert_allclose(empty_dataset.counts_off.data.mean(), 10.00055, rtol=1e-3)
1351
1352
1353
@requires_data()
1354
def test_map_dataset_on_off_to_image():
1355
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1356
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1357
1358
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1359
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1360
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1361
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1362
    acceptance_off *= 2
1363
1364
    dataset = MapDatasetOnOff(
1365
        counts=counts,
1366
        counts_off=counts_off,
1367
        acceptance=acceptance,
1368
        acceptance_off=acceptance_off,
1369
    )
1370
    image_dataset = dataset.to_image()
1371
1372
    assert image_dataset.counts.data.shape == (1, 10, 10)
1373
    assert image_dataset.acceptance_off.data.shape == (1, 10, 10)
1374
    assert_allclose(image_dataset.acceptance, 2)
1375
    assert_allclose(image_dataset.acceptance_off, 4)
1376
    assert_allclose(image_dataset.counts_off, 2)
1377
    assert image_dataset.name != dataset.name
1378
1379
    # Try with a safe_mask
1380
    mask_safe = Map.from_geom(geom, data=np.ones((2, 10, 10), dtype="bool"))
1381
    mask_safe.data[0] = 0
1382
    dataset.mask_safe = mask_safe
1383
    image_dataset = dataset.to_image()
1384
1385
    assert_allclose(image_dataset.acceptance, 1)
1386
    assert_allclose(image_dataset.acceptance_off, 2)
1387
    assert_allclose(image_dataset.counts_off, 1)
1388
1389
1390
def test_map_dataset_geom(geom, sky_model):
1391
    e_true = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=5, name="energy_true")
1392
    dataset = MapDataset.create(geom, energy_axis_true=e_true)
1393
    dataset.counts = None
1394
    dataset.background = None
1395
1396
    npred = dataset.npred()
1397
    assert npred.geom == geom
1398
1399
    dataset.mask_safe = None
1400
    dataset.mask_fit = None
1401
1402
    with pytest.raises(ValueError):
1403
        dataset._geom
1404
1405
1406
@requires_data()
1407
def test_names(geom, geom_etrue, sky_model):
1408
    m = Map.from_geom(geom)
1409
    m.quantity = 0.2 * np.ones(m.data.shape)
1410
    background_model1 = FoVBackgroundModel(dataset_name="test")
1411
    assert background_model1.name == "test-bkg"
1412
1413
    c_map1 = Map.from_geom(geom)
1414
    c_map1.quantity = 0.3 * np.ones(c_map1.data.shape)
1415
1416
    model1 = sky_model.copy()
1417
    assert model1.name != sky_model.name
1418
    model1 = sky_model.copy(name="model1")
1419
    assert model1.name == "model1"
1420
    model2 = sky_model.copy(name="model2")
1421
1422
    dataset1 = MapDataset(
1423
        counts=c_map1,
1424
        models=Models([model1, model2, background_model1]),
1425
        exposure=get_exposure(geom_etrue),
1426
        background=m,
1427
        name="test",
1428
    )
1429
1430
    dataset2 = dataset1.copy()
1431
    assert dataset2.name != dataset1.name
1432
    assert dataset2.models is None
1433
1434
    dataset2 = dataset1.copy(name="dataset2")
1435
1436
    assert dataset2.name == "dataset2"
1437
    assert dataset2.models is None
1438
1439
1440
def test_stack_dataset_dataset_on_off():
1441
    axis = MapAxis.from_edges([1, 10] * u.TeV, name="energy")
1442
    geom = WcsGeom.create(width=1, axes=[axis])
1443
1444
    gti = GTI.create([0 * u.s], [1 * u.h])
1445
1446
    dataset = MapDataset.create(geom, gti=gti)
1447
    dataset_on_off = MapDatasetOnOff.create(geom, gti=gti)
1448
    dataset_on_off.mask_safe.data += True
1449
1450
    dataset_on_off.acceptance_off += 5
1451
    dataset_on_off.acceptance += 1
1452
    dataset_on_off.counts_off += 1
1453
    dataset.stack(dataset_on_off)
1454
1455
    assert_allclose(dataset.npred_background().data, 0.166667, rtol=1e-3)
1456
1457
1458
@requires_data()
1459
def test_info_dict_on_off(images):
1460
    dataset = get_map_dataset_onoff(images)
1461
    info_dict = dataset.info_dict()
1462
    assert_allclose(info_dict["counts"], 4299, rtol=1e-3)
1463
    assert_allclose(info_dict["excess"], -22.52295, rtol=1e-3)
1464
    assert_allclose(info_dict["exposure_min"].value, 1.739467e08, rtol=1e-3)
1465
    assert_allclose(info_dict["exposure_max"].value, 3.4298378e09, rtol=1e-3)
1466
    assert_allclose(info_dict["npred"], 4321.518, rtol=1e-3)
1467
    assert_allclose(info_dict["counts_off"], 20407510.0, rtol=1e-3)
1468
    assert_allclose(info_dict["acceptance"], 4272.7075, rtol=1e-3)
1469
    assert_allclose(info_dict["acceptance_off"], 20175596.0, rtol=1e-3)
1470
    assert_allclose(info_dict["alpha"], 0.000169, rtol=1e-3)
1471
    assert_allclose(info_dict["ontime"].value, 3600)
1472
1473
1474
def test_slice_by_idx():
1475
    axis = MapAxis.from_energy_bounds("0.1 TeV", "10 TeV", nbin=17)
1476
    axis_etrue = MapAxis.from_energy_bounds(
1477
        "0.1 TeV", "10 TeV", nbin=31, name="energy_true"
1478
    )
1479
1480
    geom = WcsGeom.create(
1481
        skydir=(0, 0),
1482
        binsz=0.5,
1483
        width=(2, 2),
1484
        frame="icrs",
1485
        axes=[axis],
1486
    )
1487
    dataset = MapDataset.create(geom=geom, energy_axis_true=axis_etrue, binsz_irf=0.5)
1488
1489
    slices = {"energy": slice(5, 10)}
1490
    sub_dataset = dataset.slice_by_idx(slices)
1491
1492
    assert sub_dataset.counts.geom.data_shape == (5, 4, 4)
1493
    assert sub_dataset.mask_safe.geom.data_shape == (5, 4, 4)
1494
    assert sub_dataset.npred_background().geom.data_shape == (5, 4, 4)
1495
    assert sub_dataset.exposure.geom.data_shape == (31, 4, 4)
1496
    assert sub_dataset.edisp.edisp_map.geom.data_shape == (31, 5, 4, 4)
1497
    assert sub_dataset.psf.psf_map.geom.data_shape == (31, 66, 4, 4)
1498
1499
    axis = sub_dataset.counts.geom.axes["energy"]
1500
    assert_allclose(axis.edges[0].value, 0.387468, rtol=1e-5)
1501
1502
    slices = {"energy_true": slice(5, 10)}
1503
    sub_dataset = dataset.slice_by_idx(slices)
1504
1505
    assert sub_dataset.counts.geom.data_shape == (17, 4, 4)
1506
    assert sub_dataset.mask_safe.geom.data_shape == (17, 4, 4)
1507
    assert sub_dataset.npred_background().geom.data_shape == (17, 4, 4)
1508
    assert sub_dataset.exposure.geom.data_shape == (5, 4, 4)
1509
    assert sub_dataset.edisp.edisp_map.geom.data_shape == (5, 17, 4, 4)
1510
    assert sub_dataset.psf.psf_map.geom.data_shape == (5, 66, 4, 4)
1511
1512
    axis = sub_dataset.counts.geom.axes["energy"]
1513
    assert_allclose(axis.edges[0].value, 0.1, rtol=1e-5)
1514
1515
    axis = sub_dataset.exposure.geom.axes["energy_true"]
1516
    assert_allclose(axis.edges[0].value, 0.210175, rtol=1e-5)
1517
1518
1519
@requires_dependency("matplotlib")
1520
def test_plot_residual_onoff():
1521
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1522
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1523
1524
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1525
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1526
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1527
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1528
    acceptance_off *= 2
1529
1530
    dataset = MapDatasetOnOff(
1531
        counts=counts,
1532
        counts_off=counts_off,
1533
        acceptance=acceptance,
1534
        acceptance_off=acceptance_off,
1535
    )
1536
    with mpl_plot_check():
1537
        dataset.plot_residuals_spatial()
1538
1539
1540
def test_to_map_dataset():
1541
    axis = MapAxis.from_energy_bounds(1, 10, 2, unit="TeV")
1542
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1543
1544
    counts = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1545
    counts_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1546
    acceptance = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1547
    acceptance_off = Map.from_geom(geom, data=np.ones((2, 10, 10)))
1548
    acceptance_off *= 2
1549
1550
    dataset_onoff = MapDatasetOnOff(
1551
        counts=counts,
1552
        counts_off=counts_off,
1553
        acceptance=acceptance,
1554
        acceptance_off=acceptance_off,
1555
    )
1556
1557
    dataset = dataset_onoff.to_map_dataset(name="ds")
1558
1559
    assert dataset.name == "ds"
1560
    assert_allclose(dataset.npred_background().data.sum(), 100)
1561
    assert isinstance(dataset, MapDataset)
1562
    assert dataset.counts == dataset_onoff.counts
1563
1564
1565
def test_downsample_onoff():
1566
    axis = MapAxis.from_energy_bounds(1, 10, 4, unit="TeV")
1567
    geom = WcsGeom.create(npix=(10, 10), binsz=0.05, axes=[axis])
1568
1569
    counts = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1570
    counts_off = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1571
    acceptance = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1572
    acceptance_off = Map.from_geom(geom, data=np.ones((4, 10, 10)))
1573
    acceptance_off *= 2
1574
1575
    dataset_onoff = MapDatasetOnOff(
1576
        counts=counts,
1577
        counts_off=counts_off,
1578
        acceptance=acceptance,
1579
        acceptance_off=acceptance_off,
1580
    )
1581
1582
    downsampled = dataset_onoff.downsample(2, axis_name="energy")
1583
1584
    assert downsampled.counts.data.shape == (2, 10, 10)
1585
    assert downsampled.counts.data.sum() == dataset_onoff.counts.data.sum()
1586
    assert downsampled.counts_off.data.sum() == dataset_onoff.counts_off.data.sum()
1587
    assert_allclose(downsampled.alpha.data, 0.5)
1588
1589
1590
@requires_data()
1591
def test_source_outside_geom(sky_model, geom, geom_etrue):
1592
    dataset = get_map_dataset(geom, geom_etrue)
1593
    dataset.edisp = get_edisp(geom, geom_etrue)
1594
1595
    models = dataset.models
1596
    model = SkyModel(
1597
        PowerLawSpectralModel(),
1598
        DiskSpatialModel(lon_0=276.4 * u.deg, lat_0=-28.9 * u.deg, r_0=10 * u.deg),
1599
    )
1600
1601
    assert not geom.to_image().contains(model.position)[0]
1602
    dataset.models = models + [model]
1603
    dataset.npred()
1604
    model_npred = dataset.evaluators[model.name].compute_npred().data
1605
    assert np.sum(np.isnan(model_npred)) == 0
1606
    assert np.sum(~np.isfinite(model_npred)) == 0
1607
    assert np.sum(model_npred) > 0
1608
1609
1610
# this is a regression test for an issue found, where the model selection fails
1611
@requires_data()
1612
def test_source_outside_geom_fermi():
1613
    dataset = MapDataset.read(
1614
        "$GAMMAPY_DATA/fermi-3fhl-gc/fermi-3fhl-gc.fits.gz", format="gadf"
1615
    )
1616
1617
    catalog = SourceCatalog3FHL()
1618
    source = catalog["3FHL J1637.8-3448"]
1619
1620
    dataset.models = source.sky_model()
1621
    npred = dataset.npred()
1622
1623
    assert_allclose(npred.data.sum(), 28548.63, rtol=1e-4)
1624
1625
1626
def test_region_geom_io(tmpdir):
1627
    axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=1)
1628
    geom = RegionGeom.create("icrs;circle(0, 0, 0.2)", axes=[axis])
1629
1630
    dataset = MapDataset.create(geom)
1631
1632
    filename = tmpdir / "test.fits"
1633
    dataset.write(filename)
1634
1635
    dataset = MapDataset.read(filename, format="gadf")
1636
1637
    assert isinstance(dataset.counts.geom, RegionGeom)
1638
    assert isinstance(dataset.edisp.edisp_map.geom, RegionGeom)
1639
    assert isinstance(dataset.psf.psf_map.geom, RegionGeom)
1640
1641
1642
def test_dataset_mixed_geom(tmpdir):
1643
    energy_axis = MapAxis.from_energy_bounds("1 TeV", "10 TeV", nbin=3)
1644
    energy_axis_true = MapAxis.from_energy_bounds(
1645
        "1 TeV", "10 TeV", nbin=7, name="energy_true"
1646
    )
1647
1648
    rad_axis = MapAxis.from_bounds(0, 1, nbin=10, name="rad", unit="deg")
1649
1650
    geom = WcsGeom.create(npix=5, axes=[energy_axis])
1651
    geom_exposure = WcsGeom.create(npix=5, axes=[energy_axis_true])
1652
1653
    geom_psf = RegionGeom.create(
1654
        "icrs;circle(0, 0, 0.2)", axes=[rad_axis, energy_axis_true]
1655
    )
1656
1657
    geom_edisp = RegionGeom.create(
1658
        "icrs;circle(0, 0, 0.2)", axes=[energy_axis, energy_axis_true]
1659
    )
1660
1661
    dataset = MapDataset.from_geoms(
1662
        geom=geom, geom_exposure=geom_exposure, geom_psf=geom_psf, geom_edisp=geom_edisp
1663
    )
1664
1665
    filename = tmpdir / "test.fits"
1666
    dataset.write(filename)
1667
1668
    dataset = MapDataset.read(filename, format="gadf")
1669
1670
    assert isinstance(dataset.counts.geom, WcsGeom)
1671
    assert isinstance(dataset.exposure.geom, WcsGeom)
1672
    assert isinstance(dataset.background.geom, WcsGeom)
1673
1674
    assert isinstance(dataset.psf.psf_map.geom.region, CircleSkyRegion)
1675
    assert isinstance(dataset.edisp.edisp_map.geom.region, CircleSkyRegion)
1676
1677
1678
@requires_data()
1679
def test_map_dataset_region_geom_npred():
1680
    dataset = MapDataset.read("$GAMMAPY_DATA/cta-1dc-gc/cta-1dc-gc.fits.gz")
1681
1682
    pwl = PowerLawSpectralModel()
1683
    point = PointSpatialModel(lon_0="0 deg", lat_0="0 deg", frame="galactic")
1684
    model_1 = SkyModel(pwl, point, name="model-1")
1685
1686
    pwl = PowerLawSpectralModel(amplitude="1e-11 TeV-1 cm-2 s-1")
1687
    gauss = GaussianSpatialModel(
1688
        lon_0="0.3 deg", lat_0="0.3 deg", sigma="0.5 deg", frame="galactic"
1689
    )
1690
    model_2 = SkyModel(pwl, gauss, name="model-2")
1691
1692
    dataset.models = [model_1, model_2]
1693
1694
    region = RegionGeom.create("galactic;circle(0, 0, 0.4)").region
1695
    npred_ref = dataset.npred().to_region_nd_map(region)
1696
1697
    dataset_spec = dataset.to_region_map_dataset(region)
1698
    dataset_spec.models = [model_1, model_2]
1699
1700
    npred = dataset_spec.npred()
1701
1702
    assert_allclose(npred_ref.data, npred.data, rtol=1e-2)
1703
1704
1705 View Code Duplication
@requires_dependency("healpy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1706
def test_map_dataset_create_hpx_geom(geom_hpx):
1707
1708
    dataset = MapDataset.create(**geom_hpx, binsz_irf=10 * u.deg)
1709
1710
    assert isinstance(dataset.counts.geom, HpxGeom)
1711
    assert dataset.counts.data.shape == (3, 12288)
1712
1713
    assert isinstance(dataset.background.geom, HpxGeom)
1714
    assert dataset.background.data.shape == (3, 12288)
1715
1716
    assert isinstance(dataset.exposure.geom, HpxGeom)
1717
    assert dataset.exposure.data.shape == (4, 12288)
1718
1719
    assert isinstance(dataset.edisp.edisp_map.geom, HpxGeom)
1720
    assert dataset.edisp.edisp_map.data.shape == (4, 3, 768)
1721
1722
    assert isinstance(dataset.psf.psf_map.geom, HpxGeom)
1723
    assert dataset.psf.psf_map.data.shape == (4, 66, 768)
1724
1725
1726 View Code Duplication
@requires_dependency("healpy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1727
def test_map_dataset_create_hpx_geom_partial(geom_hpx_partial):
1728
1729
    dataset = MapDataset.create(**geom_hpx_partial, binsz_irf=2 * u.deg)
1730
1731
    assert isinstance(dataset.counts.geom, HpxGeom)
1732
    assert dataset.counts.data.shape == (3, 90)
1733
1734
    assert isinstance(dataset.background.geom, HpxGeom)
1735
    assert dataset.background.data.shape == (3, 90)
1736
1737
    assert isinstance(dataset.exposure.geom, HpxGeom)
1738
    assert dataset.exposure.data.shape == (4, 90)
1739
1740
    assert isinstance(dataset.edisp.edisp_map.geom, HpxGeom)
1741
    assert dataset.edisp.edisp_map.data.shape == (4, 3, 24)
1742
1743
    assert isinstance(dataset.psf.psf_map.geom, HpxGeom)
1744
    assert dataset.psf.psf_map.data.shape == (4, 66, 24)
1745
1746
1747
@requires_dependency("healpy")
1748
def test_map_dataset_stack_hpx_geom(geom_hpx_partial, geom_hpx):
1749
1750
    dataset_all = MapDataset.create(**geom_hpx, binsz_irf=5 * u.deg)
1751
1752
    gti = GTI.create(start=0 * u.s, stop=30 * u.min)
1753
    dataset_cutout = MapDataset.create(**geom_hpx_partial, binsz_irf=5 * u.deg, gti=gti)
1754
    dataset_cutout.counts.data += 1
1755
    dataset_cutout.background.data += 1
1756
    dataset_cutout.exposure.data += 1
1757
    dataset_cutout.mask_safe.data[...] = True
1758
1759
    dataset_all.stack(dataset_cutout)
1760
1761
    assert_allclose(dataset_all.counts.data.sum(), 3 * 90)
1762
    assert_allclose(dataset_all.background.data.sum(), 3 * 90)
1763
    assert_allclose(dataset_all.exposure.data.sum(), 4 * 90)
1764
1765
1766
@requires_data()
1767
@requires_dependency("healpy")
1768
def test_map_dataset_hpx_geom_npred(geom_hpx_partial):
1769
    hpx_geom = geom_hpx_partial["geom"]
1770
    hpx_true = hpx_geom.to_image().to_cube([geom_hpx_partial["energy_axis_true"]])
1771
    dataset = get_map_dataset(hpx_geom, hpx_true, edisp="edispkernelmap")
1772
1773
    pwl = PowerLawSpectralModel()
1774
    point = PointSpatialModel(lon_0="110 deg", lat_0="75 deg", frame="galactic")
1775
    sky_model = SkyModel(pwl, point)
1776
1777
    dataset.models = [sky_model]
1778
1779
    assert_allclose(dataset.npred().data.sum(), 54, rtol=1e-3)
1780