Passed
Pull Request — master (#1898)
by
unknown
04:26
created

gammapy/irf/energy_dispersion.py (1 issue)

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
0 ignored issues
show
Too many lines in module (1040/1000)
Loading history...
2
from __future__ import absolute_import, division, print_function, unicode_literals
3
from collections import OrderedDict
4
import numpy as np
5
from astropy.io import fits
6
from astropy.coordinates import Angle
7
from astropy.units import Quantity
8
from astropy.table import Table
9
from ..utils.energy import EnergyBounds, Energy
10
from ..utils.scripts import make_path
11
from ..utils.nddata import NDDataArray, BinnedDataAxis
12
from ..utils.fits import energy_axis_to_ebounds
13
14
__all__ = ["EnergyDispersion", "EnergyDispersion2D"]
15
16
17
class EnergyDispersion(object):
18
    """Energy dispersion matrix.
19
20
    Data format specification: :ref:`gadf:ogip-rmf`
21
22
    Parameters
23
    ----------
24
    e_true_lo, e_true_hi : `~astropy.units.Quantity`
25
        True energy axis binning
26
    e_reco_lo, e_reco_hi : `~astropy.units.Quantity`
27
        Reconstruced energy axis binning
28
    data : array_like
29
        2-dim energy dispersion matrix
30
31
    Examples
32
    --------
33
    Create a Gaussian energy dispersion matrix::
34
35
        import numpy as np
36
        import astropy.units as u
37
        from gammapy.irf import EnergyDispersion
38
        energy = np.logspace(0, 1, 101) * u.TeV
39
        edisp = EnergyDispersion.from_gauss(
40
            e_true=energy, e_reco=energy,
41
            sigma=0.1, bias=0,
42
        )
43
44
    Have a quick look:
45
46
    >>> print(edisp)
47
    >>> edisp.peek()
48
49
    See Also
50
    --------
51
    EnergyDispersion2D
52
    """
53
54
    default_interp_kwargs = dict(bounds_error=False, fill_value=0, method="nearest")
55
    """Default Interpolation kwargs for `~NDDataArray`. Fill zeros and do not
56
    interpolate"""
57
58
    def __init__(
59
        self,
60
        e_true_lo,
61
        e_true_hi,
62
        e_reco_lo,
63
        e_reco_hi,
64
        data,
65
        interp_kwargs=None,
66
        meta=None,
67
    ):
68
        if interp_kwargs is None:
69
            interp_kwargs = self.default_interp_kwargs
70
        axes = [
71
            BinnedDataAxis(
72
                e_true_lo, e_true_hi, interpolation_mode="log", name="e_true"
73
            ),
74
            BinnedDataAxis(
75
                e_reco_lo, e_reco_hi, interpolation_mode="log", name="e_reco"
76
            ),
77
        ]
78
        self.data = NDDataArray(axes=axes, data=data, interp_kwargs=interp_kwargs)
79
        self.meta = OrderedDict(meta) if meta else OrderedDict()
80
81
    def __str__(self):
82
        ss = self.__class__.__name__
83
        ss += "\n{}".format(self.data)
84
        return ss
85
86
    def apply(self, data):
87
        """Apply energy dispersion.
88
89
        Computes the matrix product of ``data``
90
        (which typically is model flux or counts in true energy bins)
91
        with the energy dispersion matrix.
92
93
        Parameters
94
        ----------
95
        data : array_like
96
            1-dim data array.
97
98
        Returns
99
        -------
100
        convolved_data : array
101
            1-dim data array after multiplication with the energy dispersion matrix
102
        """
103
        if len(data) != self.e_true.nbins:
104
            raise ValueError(
105
                "Input size {} does not match true energy axis {}".format(
106
                    len(data), self.e_true.nbins
107
                )
108
            )
109
        return np.dot(data, self.data.data)
110
111
    @property
112
    def e_reco(self):
113
        """Reconstructed energy axis (`~gammapy.utils.nddata.BinnedDataAxis`)"""
114
        return self.data.axis("e_reco")
115
116
    @property
117
    def e_true(self):
118
        """True energy axis (`~gammapy.utils.nddata.BinnedDataAxis`)"""
119
        return self.data.axis("e_true")
120
121
    @property
122
    def pdf_matrix(self):
123
        """Energy dispersion PDF matrix (`~numpy.ndarray`).
124
125
        Rows (first index): True Energy
126
        Columns (second index): Reco Energy
127
        """
128
        return self.data.data.value
129
130
    def pdf_in_safe_range(self, lo_threshold, hi_threshold):
131
        """PDF matrix with bins outside threshold set to 0.
132
133
        Parameters
134
        ----------
135
        lo_threshold : `~astropy.units.Quantity`
136
            Low reco energy threshold
137
        hi_threshold : `~astropy.units.Quantity`
138
            High reco energy threshold
139
        """
140
        data = self.pdf_matrix.copy()
141
        idx = np.where(
142
            (self.e_reco.lo < lo_threshold) | (self.e_reco.hi > hi_threshold)
143
        )
144
        data[:, idx] = 0
145
        return data
146
147
    @classmethod
148
    def from_gauss(cls, e_true, e_reco, sigma, bias, pdf_threshold=1e-6):
149
        """Create Gaussian energy dispersion matrix (`EnergyDispersion`).
150
151
        Calls :func:`gammapy.irf.EnergyDispersion2D.from_gauss`
152
153
        Parameters
154
        ----------
155
        e_true : `~astropy.units.Quantity`, `~gammapy.utils.nddata.BinnedDataAxis`
156
            Bin edges of true energy axis
157
        e_reco : `~astropy.units.Quantity`, `~gammapy.utils.nddata.BinnedDataAxis`
158
            Bin edges of reconstructed energy axis
159
        bias : float or `~numpy.ndarray`
160
            Center of Gaussian energy dispersion, bias
161
        sigma : float or `~numpy.ndarray`
162
            RMS width of Gaussian energy dispersion, resolution
163
        pdf_threshold : float, optional
164
            Zero suppression threshold
165
        """
166
        migra = np.linspace(1.0 / 3, 3, 200)
167
        # A dummy offset axis (need length 2 for interpolation to work)
168
        offset = Quantity([0, 1, 2], "deg")
169
170
        edisp = EnergyDispersion2D.from_gauss(
171
            e_true=e_true,
172
            migra=migra,
173
            sigma=sigma,
174
            bias=bias,
175
            offset=offset,
176
            pdf_threshold=pdf_threshold,
177
        )
178
        return edisp.to_energy_dispersion(offset=offset[0], e_reco=e_reco)
179
180
    @classmethod
181
    def from_diagonal_response(cls, e_true, e_reco=None):
182
        """Create energy dispersion from a diagonal response, i.e. perfect energy resolution
183
184
        This creates the matrix corresponding to a perfect energy response.
185
        It contains ones where the e_true center is inside the e_reco bin.
186
        It is a square diagonal matrix if e_true = e_reco.
187
188
        This is useful in cases where code always applies an edisp,
189
        but you don't want it to do anything.
190
191
        Parameters
192
        ----------
193
        e_true, e_reco : `~astropy.units.Quantity`
194
            Energy bounds for true and reconstructed energy axis
195
196
        Examples
197
        --------
198
        If ``e_true`` equals ``e_reco``, you get a diagonal matrix::
199
200
            e_true = [0.5, 1, 2, 4, 6] * u.TeV
201
            edisp = EnergyDispersion.from_diagonal_response(e_true)
202
            edisp.plot_matrix()
203
204
        Example with different energy binnings::
205
206
            e_true = [0.5, 1, 2, 4, 6] * u.TeV
207
            e_reco = [2, 4, 6] * u.TeV
208
            edisp = EnergyDispersion.from_diagonal_response(e_true, e_reco)
209
            edisp.plot_matrix()
210
        """
211
        if e_reco is None:
212
            e_reco = e_true
213
214
        e_true_center = 0.5 * (e_true[1:] + e_true[:-1])
215
        etrue_2d, ereco_lo_2d = np.meshgrid(e_true_center, e_reco[:-1])
216
        etrue_2d, ereco_hi_2d = np.meshgrid(e_true_center, e_reco[1:])
217
218
        data = np.logical_and(etrue_2d >= ereco_lo_2d, etrue_2d < ereco_hi_2d)
219
        data = np.transpose(data).astype("float")
220
221
        return cls(
222
            e_true_lo=e_true[:-1],
223
            e_true_hi=e_true[1:],
224
            e_reco_lo=e_reco[:-1],
225
            e_reco_hi=e_reco[1:],
226
            data=data,
227
        )
228
229
    @classmethod
230
    def from_hdulist(cls, hdulist, hdu1="MATRIX", hdu2="EBOUNDS"):
231
        """Create `EnergyDispersion` object from `~astropy.io.fits.HDUList`.
232
233
        Parameters
234
        ----------
235
        hdulist : `~astropy.io.fits.HDUList`
236
            HDU list with ``MATRIX`` and ``EBOUNDS`` extensions.
237
        hdu1 : str, optional
238
            HDU containing the energy dispersion matrix, default: MATRIX
239
        hdu2 : str, optional
240
            HDU containing the energy axis information, default, EBOUNDS
241
        """
242
        matrix_hdu = hdulist[hdu1]
243
        ebounds_hdu = hdulist[hdu2]
244
245
        data = matrix_hdu.data
246
        header = matrix_hdu.header
247
248
        pdf_matrix = np.zeros([len(data), header["DETCHANS"]], dtype=np.float64)
249
250
        for i, l in enumerate(data):
251
            if l.field("N_GRP"):
252
                m_start = 0
253
                for k in range(l.field("N_GRP")):
254
                    pdf_matrix[
255
                        i,
256
                        l.field("F_CHAN")[k] : l.field("F_CHAN")[k]
257
                        + l.field("N_CHAN")[k],
258
                    ] = l.field("MATRIX")[m_start : m_start + l.field("N_CHAN")[k]]
259
                    m_start += l.field("N_CHAN")[k]
260
261
        e_reco = EnergyBounds.from_ebounds(ebounds_hdu)
262
        e_true = EnergyBounds.from_rmf_matrix(matrix_hdu)
263
264
        return cls(
265
            e_true_lo=e_true.lower_bounds,
266
            e_true_hi=e_true.upper_bounds,
267
            e_reco_lo=e_reco.lower_bounds,
268
            e_reco_hi=e_reco.upper_bounds,
269
            data=pdf_matrix,
270
        )
271
272
    @classmethod
273
    def read(cls, filename, hdu1="MATRIX", hdu2="EBOUNDS"):
274
        """Read from file.
275
276
        Parameters
277
        ----------
278
        filename : `~gammapy.extern.pathlib.Path`, str
279
            File to read
280
        hdu1 : str, optional
281
            HDU containing the energy dispersion matrix, default: MATRIX
282
        hdu2 : str, optional
283
            HDU containing the energy axis information, default, EBOUNDS
284
        """
285
        filename = make_path(filename)
286
        with fits.open(str(filename), memmap=False) as hdulist:
287
            edisp = cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2)
288
289
        return edisp
290
291
    def to_hdulist(self, **kwargs):
292
        """Convert RMF to FITS HDU list format.
293
294
        Parameters
295
        ----------
296
        header : `~astropy.io.fits.Header`
297
            Header to be written in the fits file.
298
        energy_unit : str
299
            Unit in which the energy is written in the HDU list
300
301
        Returns
302
        -------
303
        hdulist : `~astropy.io.fits.HDUList`
304
            RMF in HDU list format.
305
306
        Notes
307
        -----
308
        For more info on the RMF FITS file format see:
309
        https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/summary/cal_gen_92_002_summary.html
310
        """
311
        # Cannot use table_to_fits here due to variable length array
312
        # http://docs.astropy.org/en/v1.0.4/io/fits/usage/unfamiliar.html
313
314
        table = self.to_table()
315
        name = table.meta.pop("name")
316
317
        header = fits.Header()
318
        header.update(table.meta)
319
320
        cols = table.columns
321
        c0 = fits.Column(
322
            name=cols[0].name, format="E", array=cols[0], unit="{}".format(cols[0].unit)
323
        )
324
        c1 = fits.Column(
325
            name=cols[1].name, format="E", array=cols[1], unit="{}".format(cols[1].unit)
326
        )
327
        c2 = fits.Column(name=cols[2].name, format="I", array=cols[2])
328
        c3 = fits.Column(name=cols[3].name, format="PI()", array=cols[3])
329
        c4 = fits.Column(name=cols[4].name, format="PI()", array=cols[4])
330
        c5 = fits.Column(name=cols[5].name, format="PE()", array=cols[5])
331
332
        hdu = fits.BinTableHDU.from_columns(
333
            [c0, c1, c2, c3, c4, c5], header=header, name=name
334
        )
335
336
        ebounds = energy_axis_to_ebounds(self.e_reco.bins)
337
        prim_hdu = fits.PrimaryHDU()
338
339
        return fits.HDUList([prim_hdu, hdu, ebounds])
340
341
    def to_table(self):
342
        """Convert to `~astropy.table.Table`.
343
344
        The output table is in the OGIP RMF format.
345
        https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html#Tab:1
346
        """
347
        rows = self.pdf_matrix.shape[0]
348
        n_grp = []
349
        f_chan = np.ndarray(dtype=np.object, shape=rows)
350
        n_chan = np.ndarray(dtype=np.object, shape=rows)
351
        matrix = np.ndarray(dtype=np.object, shape=rows)
352
353
        # Make RMF type matrix
354
        for i, row in enumerate(self.data.data.value):
355
            pos = np.nonzero(row)[0]
356
            borders = np.where(np.diff(pos) != 1)[0]
357
            # add 1 to borders for correct behaviour of np.split
358
            groups = np.asarray(np.split(pos, borders + 1))
359
            n_grp_temp = groups.shape[0] if groups.size > 0 else 1
360
            n_chan_temp = np.asarray([val.size for val in groups])
361
            try:
362
                f_chan_temp = np.asarray([val[0] for val in groups])
363
            except IndexError:
364
                f_chan_temp = np.zeros(1)
365
366
            n_grp.append(n_grp_temp)
367
            f_chan[i] = f_chan_temp
368
            n_chan[i] = n_chan_temp
369
            matrix[i] = row[pos]
370
371
        n_grp = np.asarray(n_grp, dtype=np.int16)
372
373
        # Get total number of groups and channel subsets
374
        numgrp, numelt = 0, 0
375
        for val, val2 in zip(n_grp, n_chan):
376
            numgrp += np.sum(val)
377
            numelt += np.sum(val2)
378
379
        table = Table()
380
381
        table["ENERG_LO"] = self.e_true.lo
382
        table["ENERG_HI"] = self.e_true.hi
383
        table["N_GRP"] = n_grp
384
        table["F_CHAN"] = f_chan
385
        table["N_CHAN"] = n_chan
386
        table["MATRIX"] = matrix
387
388
        table.meta = OrderedDict(
389
            [
390
                ("name", "MATRIX"),
391
                ("chantype", "PHA"),
392
                ("hduclass", "OGIP"),
393
                ("hduclas1", "RESPONSE"),
394
                ("hduclas2", "RSP_MATRIX"),
395
                ("detchans", self.e_reco.nbins),
396
                ("numgrp", numgrp),
397
                ("numelt", numelt),
398
                ("tlmin4", 0),
399
            ]
400
        )
401
402
        return table
403
404
    def write(self, filename, **kwargs):
405
        """Write to file."""
406
        filename = make_path(filename)
407
        self.to_hdulist().writeto(str(filename), **kwargs)
408
409
    def get_resolution(self, e_true):
410
        """Get energy resolution for a given true energy.
411
412
        The resolution is given as a percentage of the true energy
413
414
        Parameters
415
        ----------
416
        e_true : `~astropy.units.Quantity`
417
            True energy
418
        """
419
        var = self._get_variance(e_true)
420
        idx_true = self.e_true.find_node(e_true)
421
        e_true_real = self.e_true.nodes[idx_true]
422
        return np.sqrt(var) / e_true_real
423
424
    def get_bias(self, e_true):
425
        r"""Get reconstruction bias for a given true energy.
426
427
        Bias is defined as
428
429
        .. math::
430
431
            \frac{E_{reco}-E_{true}}{E_{true}}
432
433
        Parameters
434
        ----------
435
        e_true : `~astropy.units.Quantity`
436
            True energy
437
        """
438
        e_reco = self.get_mean(e_true)
439
        idx_true = self.e_true.find_node(e_true)
440
        e_true_real = self.e_true.nodes[idx_true]
441
        bias = (e_reco - e_true_real) / e_true_real
442
        return bias
443
444
    def get_mean(self, e_true):
445
        """Get mean reconstructed energy for a given true energy."""
446
        # find pdf for true energies
447
        idx = self.e_true.find_node(e_true)
448
        pdf = self.data.data[idx]
449
450
        # compute sum along reconstructed energy
451
        # axis to determine the mean
452
        norm = np.sum(pdf, axis=-1)
453
        temp = np.sum(pdf * self.e_reco.nodes, axis=-1)
454
455
        return temp / norm
456
457
    def _get_variance(self, e_true):
458
        """Get variance of log reconstructed energy."""
459
        # evaluate the pdf at given true energies
460
        idx = self.e_true.find_node(e_true)
461
        pdf = self.data.data[idx]
462
463
        # compute mean
464
        mean = self.get_mean(e_true)
465
466
        # create array of reconstructed-energy nodes
467
        # for each given true energy value
468
        # (first axis is reconstructed energy)
469
        erec = self.e_reco.nodes
470
        erec = np.repeat(erec, max(np.sum(mean.shape), 1)).reshape(
471
            erec.shape + mean.shape
472
        )
473
474
        # compute deviation from mean
475
        # (and move reconstructed energy axis to last axis)
476
        temp_ = (erec - mean) ** 2
477
        temp = np.rollaxis(temp_, 1)
478
479
        # compute sum along reconstructed energy
480
        # axis to determine the variance
481
        norm = np.sum(pdf, axis=-1)
482
        var = np.sum(temp * pdf, axis=-1)
483
484
        return var / norm
485
486
    def to_sherpa(self, name):
487
        """Convert to `sherpa.astro.data.DataRMF`.
488
489
        Parameters
490
        ----------
491
        name : str
492
            Instance name
493
        """
494
        from sherpa.astro.data import DataRMF
495
        from sherpa.utils import SherpaUInt, SherpaFloat
496
497
        # Need to modify RMF data
498
        # see https://github.com/sherpa/sherpa/blob/master/sherpa/astro/io/pyfits_backend.py#L727
499
500
        table = self.to_table()
501
        n_grp = table["N_GRP"].data.astype(SherpaUInt)
502
        f_chan = table["F_CHAN"].data
503
        n_chan = table["N_CHAN"].data
504
        matrix = table["MATRIX"].data
505
506
        good = n_grp > 0
507
        matrix = matrix[good]
508
        matrix = np.concatenate([row for row in matrix])
509
        matrix = matrix.astype(SherpaFloat)
510
511
        good = n_grp > 0
512
        f_chan = f_chan[good]
513
        f_chan = np.concatenate([row for row in f_chan]).astype(SherpaUInt)
514
        n_chan = n_chan[good]
515
        n_chan = np.concatenate([row for row in n_chan]).astype(SherpaUInt)
516
517
        return DataRMF(
518
            name=name,
519
            energ_lo=table["ENERG_LO"].quantity.to_value("keV").astype(SherpaFloat),
520
            energ_hi=table["ENERG_HI"].quantity.to_value("keV").astype(SherpaFloat),
521
            matrix=matrix,
522
            n_grp=n_grp,
523
            n_chan=n_chan,
524
            f_chan=f_chan,
525
            detchans=self.e_reco.nbins,
526
            e_min=self.e_reco.lo.to_value("keV"),
527
            e_max=self.e_reco.hi.to_value("keV"),
528
            offset=0,
529
        )
530
531
    def plot_matrix(self, ax=None, show_energy=None, add_cbar=False, **kwargs):
532
        """Plot PDF matrix.
533
534
        Parameters
535
        ----------
536
        ax : `~matplotlib.axes.Axes`, optional
537
            Axis
538
        show_energy : `~astropy.units.Quantity`, optional
539
            Show energy, e.g. threshold, as vertical line
540
        add_cbar : bool
541
            Add a colorbar to the plot.
542
543
        Returns
544
        -------
545
        ax : `~matplotlib.axes.Axes`
546
            Axis
547
        """
548
        import matplotlib.pyplot as plt
549
        from matplotlib.colors import PowerNorm
550
551
        kwargs.setdefault("cmap", "GnBu")
552
        norm = PowerNorm(gamma=0.5)
553
        kwargs.setdefault("norm", norm)
554
555
        ax = plt.gca() if ax is None else ax
556
557
        e_true = self.e_true.bins
558
        e_reco = self.e_reco.bins
559
        x = e_true.value
560
        y = e_reco.value
561
        z = self.pdf_matrix
562
        caxes = ax.pcolormesh(x, y, z.T, **kwargs)
563
564
        if show_energy is not None:
565
            ener_val = show_energy.to_value(self.reco_energy.unit)
566
            ax.hlines(ener_val, 0, 200200, linestyles="dashed")
567
568
        if add_cbar:
569
            label = "Probability density (A.U.)"
570
            cbar = ax.figure.colorbar(caxes, ax=ax, label=label)
571
572
        ax.set_xlabel("$E_\mathrm{{True}}$ [{unit}]".format(unit=e_true.unit))
573
        ax.set_ylabel("$E_\mathrm{{Reco}}$ [{unit}]".format(unit=e_reco.unit))
574
        ax.set_xscale("log")
575
        ax.set_yscale("log")
576
        ax.set_xlim(x.min(), x.max())
577
        ax.set_ylim(y.min(), y.max())
578
        return ax
579
580
    def plot_bias(self, ax=None, **kwargs):
581
        """Plot reconstruction bias.
582
583
        See `~gammapy.irf.EnergyDispersion.get_bias` method.
584
585
        Parameters
586
        ----------
587
        ax : `~matplotlib.axes.Axes`, optional
588
            Axis
589
        """
590
        import matplotlib.pyplot as plt
591
592
        ax = plt.gca() if ax is None else ax
593
594
        x = self.e_true.nodes.to_value("TeV")
595
        y = self.get_bias(self.e_true.nodes)
596
597
        ax.plot(x, y, **kwargs)
598
        ax.set_xlabel("$E_\mathrm{{True}}$ [TeV]")
599
        ax.set_ylabel(r"($E_\mathrm{{True}} - E_\mathrm{{Reco}} / E_\mathrm{{True}}$)")
600
        ax.set_xscale("log")
601
        return ax
602
603
    def peek(self, figsize=(15, 5)):
604
        """Quick-look summary plot."""
605
        import matplotlib.pyplot as plt
606
607
        fig, axes = plt.subplots(nrows=1, ncols=2, figsize=figsize)
608
        self.plot_bias(ax=axes[0])
609
        self.plot_matrix(ax=axes[1])
610
        plt.tight_layout()
611
612
613
class EnergyDispersion2D(object):
614
    """Offset-dependent energy dispersion matrix.
615
616
    Data format specification: :ref:`gadf:edisp_2d`
617
618
    Parameters
619
    ----------
620
    e_true_lo, e_true_hi : `~astropy.units.Quantity`
621
        True energy axis binning
622
    migra_lo, migra_hi : `~numpy.ndarray`
623
        Energy migration axis binning
624
    offset_lo, offset_hi : `~astropy.coordinates.Angle`
625
        Field of view offset axis binning
626
    data : `~numpy.ndarray`
627
        Energy dispersion probability density
628
629
    Examples
630
    --------
631
    Read energy dispersion IRF from disk:
632
633
    >>> from gammapy.irf import EnergyDispersion2D
634
    >>> from gammapy.utils.energy import EnergyBounds
635
    >>> filename = '$GAMMAPY_EXTRA/test_datasets/irf/hess/pa/hess_edisp_2d_023523.fits.gz'
636
    >>> edisp2d = EnergyDispersion2D.read(filename, hdu='ENERGY DISPERSION')
637
    >>> print(edisp2d)
638
    EnergyDispersion2D
639
    NDDataArray summary info
640
    e_true         : size =    15, min =  0.125 TeV, max = 80.000 TeV
641
    migra          : size =   100, min =  0.051, max = 10.051
642
    offset         : size =     6, min =  0.125 deg, max =  2.500 deg
643
    Data           : size =  9000, min =  0.000, max =  3.405
644
645
    Create energy dispersion matrix (`~gammapy.irf.EnergyDispersion`)
646
    for a given field of view offset and energy binning:
647
648
    >>> energy = EnergyBounds.equal_log_spacing(0.1,20,60, 'TeV')
649
    >>> offset = '1.2 deg'
650
    >>> edisp = edisp2d.to_energy_dispersion(offset=offset, e_reco=energy, e_true=energy)
651
    >>> print(edisp)
652
    EnergyDispersion
653
    NDDataArray summary info
654
    e_true         : size =    60, min =  0.105 TeV, max = 19.136 TeV
655
    e_reco         : size =    60, min =  0.105 TeV, max = 19.136 TeV
656
    Data           : size =  3600, min =  0.000, max =  0.266
657
658
    See Also
659
    --------
660
    EnergyDispersion
661
    """
662
663
    default_interp_kwargs = dict(bounds_error=False, fill_value=None)
664
    """Default Interpolation kwargs for `~gammapy.utils.nddata.NDDataArray`. Extrapolate."""
665
666
    def __init__(
667
        self,
668
        e_true_lo,
669
        e_true_hi,
670
        migra_lo,
671
        migra_hi,
672
        offset_lo,
673
        offset_hi,
674
        data,
675
        interp_kwargs=None,
676
        meta=None,
677
    ):
678
        if interp_kwargs is None:
679
            interp_kwargs = self.default_interp_kwargs
680
        axes = [
681
            BinnedDataAxis(
682
                e_true_lo, e_true_hi, interpolation_mode="log", name="e_true"
683
            ),
684
            BinnedDataAxis(
685
                migra_lo, migra_hi, interpolation_mode="linear", name="migra"
686
            ),
687
            BinnedDataAxis(
688
                offset_lo, offset_hi, interpolation_mode="linear", name="offset"
689
            ),
690
        ]
691
        self.data = NDDataArray(axes=axes, data=data, interp_kwargs=interp_kwargs)
692
        self.meta = OrderedDict(meta) if meta else OrderedDict()
693
694
    def __str__(self):
695
        ss = self.__class__.__name__
696
        ss += "\n{}".format(self.data)
697
        return ss
698
699
    @classmethod
700
    def from_gauss(cls, e_true, migra, bias, sigma, offset, pdf_threshold=1e-6):
701
        """Create Gaussian energy dispersion matrix (`EnergyDispersion2D`).
702
703
        The output matrix will be Gaussian in (e_true / e_reco).
704
705
        The ``bias`` and ``sigma`` should be either floats or arrays of same dimension than
706
        ``e_true``. ``bias`` refers to the mean value of the ``migra``
707
        distribution minus one, i.e. ``bias=0`` means no bias.
708
709
        Note that, the output matrix is flat in offset.
710
711
        Parameters
712
        ----------
713
        e_true : `~astropy.units.Quantity`, `~gammapy.utils.nddata.BinnedDataAxis`
714
            Bin edges of true energy axis
715
        migra : `~astropy.units.Quantity`, `~gammapy.utils.nddata.BinnedDataAxis`
716
            Bin edges of migra axis
717
        bias : float or `~numpy.ndarray`
718
            Center of Gaussian energy dispersion, bias
719
        sigma : float or `~numpy.ndarray`
720
            RMS width of Gaussian energy dispersion, resolution
721
        offset : `~astropy.units.Quantity`, `~gammapy.utils.nddata.BinnedDataAxis`
722
            Bin edges of offset
723
        pdf_threshold : float, optional
724
            Zero suppression threshold
725
        """
726
        from scipy.special import erf
727
728
        e_true = EnergyBounds(e_true)
729
        # erf does not work with Quantities
730
        true = e_true.log_centers.to_value("TeV")
731
732
        true2d, migra2d = np.meshgrid(true, migra)
733
734
        migra2d_lo = migra2d[:-1, :]
735
        migra2d_hi = migra2d[1:, :]
736
737
        # Analytical formula for integral of Gaussian
738
        s = np.sqrt(2) * sigma
739
        t1 = (migra2d_hi - 1 - bias) / s
740
        t2 = (migra2d_lo - 1 - bias) / s
741
        pdf = (erf(t1) - erf(t2)) / 2
742
743
        pdf_array = pdf.T[:, :, np.newaxis] * np.ones(len(offset) - 1)
744
745
        pdf_array = np.where(pdf_array > pdf_threshold, pdf_array, 0)
746
747
        return cls(
748
            e_true[:-1],
749
            e_true[1:],
750
            migra[:-1],
751
            migra[1:],
752
            offset[:-1],
753
            offset[1:],
754
            pdf_array,
755
        )
756
757
    @classmethod
758
    def from_table(cls, table):
759
        """Create from `~astropy.table.Table`."""
760
        if "ENERG_LO" in table.colnames:
761
            e_lo = table["ENERG_LO"].quantity[0]
762
            e_hi = table["ENERG_HI"].quantity[0]
763
        elif "ETRUE_LO" in table.colnames:
764
            e_lo = table["ETRUE_LO"].quantity[0]
765
            e_hi = table["ETRUE_HI"].quantity[0]
766
        else:
767
            raise ValueError(
768
                'Invalid column names. Need "ENERG_LO/ENERG_HI" or "ETRUE_LO/ETRUE_HI"'
769
            )
770
        o_lo = table["THETA_LO"].quantity[0]
771
        o_hi = table["THETA_HI"].quantity[0]
772
        m_lo = table["MIGRA_LO"].quantity[0]
773
        m_hi = table["MIGRA_HI"].quantity[0]
774
775
        matrix = (
776
            table["MATRIX"].quantity[0].transpose()
777
        )  ## TODO Why does this need to be transposed?
778
        return cls(
779
            e_true_lo=e_lo,
780
            e_true_hi=e_hi,
781
            offset_lo=o_lo,
782
            offset_hi=o_hi,
783
            migra_lo=m_lo,
784
            migra_hi=m_hi,
785
            data=matrix,
786
        )
787
788
    @classmethod
789
    def from_hdulist(cls, hdulist, hdu="edisp_2d"):
790
        """Create from `~astropy.io.fits.HDUList`."""
791
        return cls.from_table(Table.read(hdulist[hdu]))
792
793
    @classmethod
794
    def read(cls, filename, hdu="edisp_2d"):
795
        """Read from FITS file.
796
797
        Parameters
798
        ----------
799
        filename : str
800
            File name
801
        """
802
        filename = make_path(filename)
803
        with fits.open(str(filename), memmap=False) as hdulist:
804
            edisp = cls.from_hdulist(hdulist, hdu)
805
806
        return edisp
807
808
    def to_energy_dispersion(self, offset, e_true=None, e_reco=None):
809
        """Detector response R(Delta E_reco, Delta E_true)
810
811
        Probability to reconstruct an energy in a given true energy band
812
        in a given reconstructed energy band
813
814
        Parameters
815
        ----------
816
        offset : `~astropy.coordinates.Angle`
817
            Offset
818
        e_true : `~gammapy.utils.energy.EnergyBounds`, None
819
            True energy axis
820
        e_reco : `~gammapy.utils.energy.EnergyBounds`
821
            Reconstructed energy axis
822
823
        Returns
824
        -------
825
        edisp : `~gammapy.irf.EnergyDispersion`
826
            Energy dispersion matrix
827
        """
828
        offset = Angle(offset)
829
        e_true = self.data.axis("e_true").bins if e_true is None else e_true
830
        e_reco = self.data.axis("e_true").bins if e_reco is None else e_reco
831
        e_true = EnergyBounds(e_true)
832
        e_reco = EnergyBounds(e_reco)
833
834
        data = []
835
        for energy in e_true.log_centers:
836
            vec = self.get_response(offset=offset, e_true=energy, e_reco=e_reco)
837
            data.append(vec)
838
839
        data = np.asarray(data)
840
        e_lo, e_hi = e_true[:-1], e_true[1:]
841
        ereco_lo, ereco_hi = (e_reco[:-1], e_reco[1:])
842
843
        return EnergyDispersion(
844
            e_true_lo=e_lo,
845
            e_true_hi=e_hi,
846
            e_reco_lo=ereco_lo,
847
            e_reco_hi=ereco_hi,
848
            data=data,
849
        )
850
851
    def get_response(self, offset, e_true, e_reco=None, migra_step=5e-3):
852
        """Detector response R(Delta E_reco, E_true)
853
854
        Probability to reconstruct a given true energy in a given reconstructed
855
        energy band. In each reco bin, you integrate with a riemann sum over
856
        the default migra bin of your analysis.
857
858
        Parameters
859
        ----------
860
        e_true : `~gammapy.utils.energy.Energy`
861
            True energy
862
        e_reco : `~gammapy.utils.energy.EnergyBounds`, None
863
            Reconstructed energy axis
864
        offset : `~astropy.coordinates.Angle`
865
            Offset
866
        migra_step : float
867
            Integration step in migration
868
869
        Returns
870
        -------
871
        rv : `~numpy.ndarray`
872
            Redistribution vector
873
        """
874
        e_true = Energy(e_true)
875
876
        if e_reco is None:
877
            # Default: e_reco nodes = migra nodes * e_true nodes
878
            migra_axis = self.data.axis("migra")
879
            e_reco = EnergyBounds.from_lower_and_upper_bounds(
880
                migra_axis.lo * e_true, migra_axis.hi * e_true
881
            )
882
        else:
883
            # Translate given e_reco binning to migra at bin center
884
            e_reco = EnergyBounds(e_reco)
885
886
        # migration value of e_reco bounds
887
        migra_e_reco = e_reco / e_true
888
889
        # Define a vector of migration with mig_step step
890
        mrec_min = self.data.axis("migra").lo[0]
891
        mrec_max = self.data.axis("migra").hi[-1]
892
        mig_array = np.arange(mrec_min, mrec_max, migra_step)
893
894
        # Compute energy dispersion probability dP/dm for each element of migration array
895
        vals = self.data.evaluate(offset=offset, e_true=e_true, migra=mig_array)
896
897
        # Compute normalized cumulative sum to prepare integration
898
        with np.errstate(invalid="ignore"):
899
            tmp = np.nan_to_num(np.cumsum(vals) / np.sum(vals))
900
901
        # Determine positions (bin indices) of e_reco bounds in migration array
902
        pos_mig = np.digitize(migra_e_reco, mig_array) - 1
903
        # We ensure that no negative values are found
904
        pos_mig = np.maximum(pos_mig, 0)
905
906
        # We compute the difference between 2 successive bounds in e_reco
907
        # to get integral over reco energy bin
908
        integral = np.diff(tmp[pos_mig])
909
910
        return integral
911
912
    def plot_migration(self, ax=None, offset=None, e_true=None, migra=None, **kwargs):
913
        """Plot energy dispersion for given offset and true energy.
914
915
        Parameters
916
        ----------
917
        ax : `~matplotlib.axes.Axes`, optional
918
            Axis
919
        offset : `~astropy.coordinates.Angle`, optional
920
            Offset
921
        e_true : `~gammapy.utils.energy.Energy`, optional
922
            True energy
923
        migra : `~numpy.array`, list, optional
924
            Migration nodes
925
926
        Returns
927
        -------
928
        ax : `~matplotlib.axes.Axes`
929
            Axis
930
        """
931
        import matplotlib.pyplot as plt
932
933
        ax = plt.gca() if ax is None else ax
934
935
        if offset is None:
936
            offset = Angle([1], "deg")
937
        else:
938
            offset = np.atleast_1d(Angle(offset))
939
        if e_true is None:
940
            e_true = Energy([0.1, 1, 10], "TeV")
941
        else:
942
            e_true = np.atleast_1d(Energy(e_true))
943
        migra = self.data.axis("migra").nodes if migra is None else migra
944
945
        for ener in e_true:
946
            for off in offset:
947
                disp = self.data.evaluate(offset=off, e_true=ener, migra=migra)
948
                label = "offset = {0:.1f}\nenergy = {1:.1f}".format(off, ener)
949
                ax.plot(migra, disp, label=label, **kwargs)
950
951
        ax.set_xlabel("$E_\mathrm{{Reco}} / E_\mathrm{{True}}$")
952
        ax.set_ylabel("Probability density")
953
        ax.legend(loc="upper left")
954
955
        return ax
956
957
    def plot_bias(self, ax=None, offset=None, add_cbar=False, **kwargs):
958
        """Plot migration as a function of true energy for a given offset.
959
960
        Parameters
961
        ----------
962
        ax : `~matplotlib.axes.Axes`, optional
963
            Axis
964
        offset : `~astropy.coordinates.Angle`, optional
965
            Offset
966
        add_cbar : bool
967
            Add a colorbar to the plot.
968
        kwargs : dict
969
            Keyword arguments passed to `~matplotlib.pyplot.pcolormesh`.
970
971
        Returns
972
        -------
973
        ax : `~matplotlib.axes.Axes`
974
            Axis
975
        """
976
        from matplotlib.colors import PowerNorm
977
        import matplotlib.pyplot as plt
978
979
        kwargs.setdefault("cmap", "GnBu")
980
        kwargs.setdefault("norm", PowerNorm(gamma=0.5))
981
982
        ax = plt.gca() if ax is None else ax
983
984
        if offset is None:
985
            offset = Angle([1], "deg")
986
987
        e_true = self.data.axis("e_true").bins
988
        migra = self.data.axis("migra").bins
989
990
        x = e_true.value
991
        y = migra.value
992
        z = self.data.evaluate(offset=offset, e_true=e_true, migra=migra).value
993
994
        caxes = ax.pcolormesh(x, y, z.T, **kwargs)
995
996
        if add_cbar:
997
            label = "Probability density (A.U.)"
998
            ax.figure.colorbar(caxes, ax=ax, label=label)
999
1000
        ax.set_xlabel("$E_\mathrm{{True}}$ [{unit}]".format(unit=e_true.unit))
1001
        ax.set_ylabel("$E_\mathrm{{Reco}} / E_\mathrm{{True}}$")
1002
        ax.set_xlim(x.min(), x.max())
1003
        ax.set_ylim(y.min(), y.max())
1004
        ax.set_xscale("log")
1005
        return ax
1006
1007
    def peek(self, figsize=(15, 5)):
1008
        """Quick-look summary plots.
1009
1010
        Parameters
1011
        ----------
1012
        figsize : (float, float)
1013
            Size of the resulting plot
1014
        """
1015
        import matplotlib.pyplot as plt
1016
1017
        fig, axes = plt.subplots(nrows=1, ncols=3, figsize=figsize)
1018
        self.plot_bias(ax=axes[0])
1019
        self.plot_migration(ax=axes[1])
1020
        edisp = self.to_energy_dispersion(offset="1 deg")
1021
        edisp.plot_matrix(ax=axes[2])
1022
1023
        plt.tight_layout()
1024
1025 View Code Duplication
    def to_table(self):
1026
        """Convert to `~astropy.table.Table`."""
1027
        meta = self.meta.copy()
1028
        table = Table(meta=meta)
1029
        table["ENERG_LO"] = self.data.axis("e_true").lo[np.newaxis]
1030
        table["ENERG_HI"] = self.data.axis("e_true").hi[np.newaxis]
1031
        table["MIGRA_LO"] = self.data.axis("migra").hi[np.newaxis]
1032
        table["MIGRA_HI"] = self.data.axis("migra").hi[np.newaxis]
1033
        table["THETA_LO"] = self.data.axis("offset").lo[np.newaxis]
1034
        table["THETA_HI"] = self.data.axis("offset").hi[np.newaxis]
1035
        table["MATRIX"] = self.data.data.T[np.newaxis]
1036
        return table
1037
1038
    def to_fits(self, name="ENERGY DISPERSION"):
1039
        """Convert to `~astropy.io.fits.BinTable`."""
1040
        return fits.BinTableHDU(self.to_table(), name=name)
1041