Passed
Pull Request — main (#160)
by Chaitanya
01:30
created

asgardpy.stats.stats.fetch_pivot_energy()   A

Complexity

Conditions 3

Size

Total Lines 36
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 1
dl 0
loc 36
rs 9.75
c 0
b 0
f 0
1
"""
2
Module for performing some statistic functions.
3
"""
4
import numpy as np
5
from gammapy.modeling.models import CompoundSpectralModel
6
from gammapy.stats.fit_statistics import cash, wstat
7
from scipy.optimize import minimize_scalar
8
from scipy.stats import chi2, norm
9
10
__all__ = [
11
    "check_model_preference_aic",
12
    "check_model_preference_lrt",
13
    "fetch_pivot_energy",
14
    "get_chi2_sig_pval",
15
    "get_goodness_of_fit_stats",
16
    "get_ts_target",
17
]
18
19
20
def get_chi2_sig_pval(test_stat, ndof):
21
    """
22
    Using the log-likelihood value for a model fitting to data, along with the
23
    total degrees of freedom, evaluate the significance value in terms of gaussian
24
    distribution along with one-tailed p-value for the fitting statistics.
25
26
    In Gammapy, for 3D analysis, cash statistics is used, while for 1D analysis,
27
    wstat statistics is used. Check the documentation for more details
28
    https://docs.gammapy.org/1.1/user-guide/stats/index.html
29
30
    Parameters
31
    ----------
32
    test_stat: float
33
        The test statistic (-2 ln L) value of the fitting.
34
    ndof: int
35
        Total number of degrees of freedom.
36
37
    Returns
38
    -------
39
    chi2_sig: float
40
        significance (Chi2) of the likelihood of fit model estimated in
41
        Gaussian distribution.
42
    pval: float
43
        p-value for the model fitting
44
45
    """
46
    pval = chi2.sf(test_stat, ndof)
47
    chi2_sig = norm.isf(pval / 2)
48
49
    return chi2_sig, pval
50
51
52
def check_model_preference_lrt(test_stat_1, test_stat_2, ndof_1, ndof_2):
53
    """
54
    Log-likelihood ratio test. Checking the preference of a "nested" spectral
55
    model2 (observed), over a primary model1.
56
57
    Parameters
58
    ----------
59
    test_stat_1: float
60
        The test statistic (-2 ln L) of the Fit result of the primary spectral model.
61
    test_stat_2: float
62
        The test statistic (-2 ln L) of the Fit result of the nested spectral model.
63
    ndof_1: int
64
        Number of degrees of freedom for the primary model
65
    ndof_2: int
66
        Number of degrees of freedom for the nested model
67
68
    Returns
69
    -------
70
    p_value: float
71
        p-value for the ratio of the likelihoods
72
    gaussian_sigmas: float
73
        significance (Chi2) of the ratio of the likelihoods estimated in
74
        Gaussian distribution.
75
    n_dof: int
76
        number of degrees of freedom or free parameters between primary and
77
        nested model.
78
    """
79
    n_dof = ndof_1 - ndof_2
80
81
    if n_dof < 1:
82
        print(f"DoF is lower in {ndof_1} compared to {ndof_2}")
83
84
        return np.nan, np.nan, n_dof
85
86
    gaussian_sigmas, p_value = get_chi2_sig_pval(test_stat_1 - test_stat_2, n_dof)
87
88
    return p_value, gaussian_sigmas, n_dof
89
90
91
def check_model_preference_aic(list_stat, list_dof):
92
    """
93
    Akaike Information Criterion (AIC) preference over a list of stat and DoF
94
    (degree of freedom) to get relative likelihood of a given list of best-fit
95
    models.
96
97
    Parameters
98
    ----------
99
    list_wstat: list
100
        List of stat or -2 Log likelihood values for a list of models.
101
    list_dof: list
102
        List of degrees of freedom or list of free parameters, for a list of models.
103
104
    Returns
105
    -------
106
    list_rel_p: list
107
        List of relative likelihood probabilities, for a list of models.
108
    """
109
    list_aic_stat = []
110
    for stat, dof in zip(list_stat, list_dof, strict=True):
111
        aic_stat = stat + 2 * dof
112
        list_aic_stat.append(aic_stat)
113
    list_aic_stat = np.array(list_aic_stat)
114
115
    aic_stat_min = np.min(list_aic_stat)
116
117
    list_b_stat = []
118
    for aic in list_aic_stat:
119
        b_stat = np.exp((aic_stat_min - aic) / 2)
120
        list_b_stat.append(b_stat)
121
    list_b_stat = np.array(list_b_stat)
122
123
    list_rel_p = []
124
    for b_stat in list_b_stat:
125
        rel_p = b_stat / np.sum(list_b_stat)
126
        list_rel_p.append(rel_p)
127
    list_rel_p = np.array(list_rel_p)
128
129
    return list_rel_p
130
131
132
def get_goodness_of_fit_stats(datasets, instrument_spectral_info):
133
    """
134
    Evaluating the Goodness of Fit statistics of the fitting of the model to
135
    the dataset.
136
137
    We first use the get_ts_target function to get the total test statistic for
138
    the (observed) best fit of the model to the data, and the (expected)
139
    perfect fit of model and data (model = data), for the given target source
140
    region/pixel.
141
142
    We then evaluate the total number of Degrees of Freedom for the Fit as the
143
    difference between the number of relevant energy bins used in the evaluation
144
    and the number of free model parameters.
145
146
    The fit statistics difference is used as the test statistic value for
147
    get_chi2_sig_pval function along with the total number of degrees of freedom
148
    to get the final statistics for the goodness of fit.
149
150
    The fit statistics information is updated in the dict object provided and
151
    a logging message is passed.
152
153
    Parameter
154
    ---------
155
    datasets: `gammapy.datasets.Datasets`
156
        List of Datasets object, which can contain 3D and/or 1D datasets
157
    instrument_spectral_info: dict
158
        Dict of information for storing relevant fit stats
159
160
    Return
161
    ------
162
    instrument_spectral_info: dict
163
        Filled Dict of information with relevant fit statistics
164
    stat_message: str
165
        String for logging the fit statistics
166
    """
167
    stat_best_fit, stat_max_fit = get_ts_target(datasets)
168
169
    instrument_spectral_info["max_fit_stat"] = stat_max_fit
170
    instrument_spectral_info["best_fit_stat"] = stat_best_fit
171
    ndof = instrument_spectral_info["DoF"]
172
    stat_diff_gof = stat_best_fit - stat_max_fit
173
174
    fit_chi2_sig, fit_pval = get_chi2_sig_pval(stat_diff_gof, ndof)
175
176
    instrument_spectral_info["fit_chi2_sig"] = fit_chi2_sig
177
    instrument_spectral_info["fit_pval"] = fit_pval
178
179
    stat_message = "The Chi2/dof value of the goodness of Fit is "
180
    stat_message += f"{stat_diff_gof:.2f}/{ndof}\nand the p-value is {fit_pval:.3e} "
181
    stat_message += f"and in Significance {fit_chi2_sig:.2f} sigmas"
182
    stat_message += f"\nwith best fit TS (Observed) as {stat_best_fit:.3f} "
183
    stat_message += f"and max fit TS (Expected) as {stat_max_fit:.3f}"
184
185
    return instrument_spectral_info, stat_message
186
187
188
def get_ts_target(datasets):
189
    """
190
    From a given list of DL4 datasets, with assumed associated models, estimate
191
    the total test statistic values, in the given target source region/pixel,
192
    for the (observed) best fit of the model to the data, and the (expected)
193
    perfect fit of model and data (model = data).
194
195
    For consistency in the evaluation of the statistic values, we will use the
196
    basic Fit Statistic functions in Gammapy for Poisson Data:
197
198
    * `cash <https://docs.gammapy.org/1.1/api/gammapy.stats.cash.html>`_
199
200
    * `wstat <https://docs.gammapy.org/1.1/api/gammapy.stats.wstat.html>`_
201
202
    For the different type of Statistics used in Gammapy for 3D/1D datasets,
203
    and for our use case of getting the best fit and perfect fit, we will pass
204
    the appropriate values, by adapting to the following methods,
205
206
    * Best Fit (Observed):
207
208
        * `Cash stat_array <https://docs.gammapy.org/1.1/api/gammapy.datasets.MapDataset.html#gammapy.datasets.MapDataset.stat_array # noqa>`_
209
210
        * `Wstat stat_array <https://docs.gammapy.org/1.1/api/gammapy.datasets.MapDatasetOnOff.html#gammapy.datasets.MapDatasetOnOff.stat_array # noqa>`_
211
212
    * Perfect Fit (Expected):
213
214
        * `Cash stat_max <https://docs.gammapy.org/1.1/api/gammapy.stats.CashCountsStatistic.html#gammapy.stats.CashCountsStatistic.stat_max # noqa>`_
215
216
        * `Wstat stat_max <https://docs.gammapy.org/1.1/api/gammapy.stats.WStatCountsStatistic.html#gammapy.stats.WStatCountsStatistic.stat_max # noqa>`_
217
218
    Parameter
219
    ---------
220
    datasets: `gammapy.datasets.Datasets`
221
        List of Datasets object, which can contain 3D and/or 1D datasets
222
223
    Return
224
    ------
225
    stat_best_fit: float
226
        Total sum of test statistic of the best fit of model to data, summed
227
        over all energy bins.
228
    stat_max_fit: float
229
        Test statistic difference of the perfect fit of model to data summed
230
        over all energy bins.
231
    """  # noqa
232
    stat_best_fit = 0
233
    stat_max_fit = 0
234
235
    for data in datasets:
236
        if data.stat_type != "chi2":
237
            # Assuming that the Counts Map is created with the target source as its center
238
            region = data.counts.geom.center_skydir
239
240
            if data.stat_type == "cash":
241
                counts_on = (data.counts.copy() * data.mask).get_spectrum(region).data
242
                mu_on = (data.npred() * data.mask).get_spectrum(region).data
243
244
                stat_best_fit += np.nansum(cash(n_on=counts_on, mu_on=mu_on).ravel())
245
                stat_max_fit += np.nansum(cash(n_on=counts_on, mu_on=counts_on).ravel())
246
247
            elif data.stat_type == "wstat":
248
                counts_on = (data.counts.copy() * data.mask).get_spectrum(region).data
249
                counts_off = np.nan_to_num((data.counts_off * data.mask).get_spectrum(region)).data
250
251
                # alpha is evaluated by acceptance ratios, and
252
                # Background is evaluated with given alpha and counts_off,
253
                # but for alpha to be of the same shape (in the target region),
254
                # it will be reevaluated
255
                bkg = np.nan_to_num((data.background * data.mask).get_spectrum(region))
256
257
                with np.errstate(invalid="ignore", divide="ignore"):
258
                    alpha = bkg / counts_off
259
                mu_signal = np.nan_to_num((data.npred_signal() * data.mask).get_spectrum(region)).data
260
                max_pred = counts_on - bkg
261
262
                stat_best_fit += np.nansum(wstat(n_on=counts_on, n_off=counts_off, alpha=alpha, mu_sig=mu_signal))
263
                stat_max_fit += np.nansum(wstat(n_on=counts_on, n_off=counts_off, alpha=alpha, mu_sig=max_pred))
264
        else:
265
            # For FluxxPointsDataset
266
            stat_best_fit += np.nansum(data.stat_array())
267
            stat_max_fit += len(data.data.dnde.data)
268
269
    return stat_best_fit, stat_max_fit
270
271
272
def pivot_energy(spectral_model):
273
    """
274
    Using function of SpectralModel object in Gammapy 1.2.dev build. Will be removed
275
    with the new Gammapy release.
276
277
    Pivot or decorrelation energy, for a given spectral model calculated numerically.
278
279
    It is defined as the energy at which the correlation between the spectral parameters is minimized.
280
281
    Returns
282
    -------
283
    pivot energy : `~astropy.units.Quantity`
284
        The energy at which the statistical error in the computed flux is smallest.
285
        If no minimum is found, NaN will be returned.
286
    """
287
    x_unit = spectral_model.reference.unit
288
289
    def min_func(x):
290
        """Function to minimise."""
291
        x = np.exp(x)
292
        dnde, dnde_error = spectral_model.evaluate_error(x * x_unit)
293
        return dnde_error / dnde
294
295
    bounds = [np.log(spectral_model.reference.value) - 3, np.log(spectral_model.reference.value) + 3]
296
297
    std = np.std(min_func(x=np.linspace(bounds[0], bounds[1], 100)))
298
    if std < 1e-5:
299
        print("The relative error on the flux does not depend on energy. No pivot energy found.")
300
        return np.nan * x_unit
301
302
    minimizer = minimize_scalar(min_func, bounds=bounds)
303
304
    if not minimizer.success:
305
        print("No minima found in the relative error on the flux. Pivot energy computation failed.")
306
        return np.nan * x_unit
307
    else:
308
        return np.exp(minimizer.x) * x_unit
309
310
311
def fetch_pivot_energy(analysis):
312
    """
313
    Using an 'AsgardpyAnalysis' object to get the pivot energy for a given dataset
314
    and fit model, using the pivot_energy function.
315
316
    In Gammapy v1.2, use instead
317
    'analysis.fit_result.models[0].spectral_model.model1.pivot_energy' to get this value.
318
319
    Returns
320
    -------
321
    pivot energy : `~astropy.units.Quantity`
322
        The energy at which the statistical error in the computed flux is smallest.
323
        If no minimum is found, NaN will be returned.
324
    """
325
    # Check if DL4 datasets are created, and if not, only run steps till Fit
326
    if len(analysis.datasets) == 0:
327
        steps = [step for step in analysis.config.general.steps if step != "flux-points"]
328
329
        analysis.run(steps)
330
    else:
331
        analysis.run(["fit"])
332
333
    # Assuming EBL model is present
334
    if isinstance(analysis.datasets[0].models[0].spectral_model, CompoundSpectralModel):
335
        temp_model = analysis.datasets[0].models[0].spectral_model.model1
336
    else:
337
        temp_model = analysis.datasets[0].models[0].spectral_model
338
339
    # Fetching the covariance matrix for the given dataset and optimized fit model
340
    cov_matrix = analysis.fit.covariance(
341
        datasets=analysis.datasets, optimize_result=analysis.fit_result.optimize_result
342
    ).matrix
343
344
    temp_model.covariance = cov_matrix[: len(temp_model.parameters), : len(temp_model.parameters)]
345
346
    return pivot_energy(temp_model)
347