Passed
Push — master ( fb4dcc...ae5621 )
by Axel
02:38
created

gammapy.modeling.iminuit.MinuitLikelihood.fcn()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
"""iminuit fitting functions."""
3
import logging
4
import numpy as np
5
from scipy.stats import chi2, norm
6
from .likelihood import Likelihood
7
8
__all__ = ["optimize_iminuit", "covariance_iminuit", "confidence_iminuit", "contour_iminuit"]
9
10
log = logging.getLogger(__name__)
11
12
13
class MinuitLikelihood(Likelihood):
14
    """Likelihood function interface for iminuit."""
15
16
    def fcn(self, *factors):
17
        self.parameters.set_parameter_factors(factors)
18
19
        total_stat = self.function()
20
21
        if self.store_trace:
22
            self.store_trace_iteration(total_stat)
23
24
        return total_stat
25
26
27
def setup_iminuit(parameters, function, store_trace=False, **kwargs):
28
    from iminuit import Minuit
29
30
    minuit_func = MinuitLikelihood(function, parameters, store_trace=store_trace)
31
32
    pars, errors, limits = make_minuit_par_kwargs(parameters)
33
34
    minuit = Minuit(minuit_func.fcn, name=list(pars.keys()), **pars)
35
    minuit.tol = kwargs.pop("tol", 0.1)
36
    minuit.errordef = kwargs.pop("errordef", 1)
37
    minuit.print_level = kwargs.pop("print_level", 0)
38
    minuit.strategy = kwargs.pop("strategy", 1)
39
40
    for name, error in errors.items():
41
        minuit.errors[name] = error
42
43
    for name, limit in limits.items():
44
        minuit.limits[name] = limit
45
46
    return minuit, minuit_func
47
48
49
def optimize_iminuit(parameters, function, store_trace=False, **kwargs):
50
    """iminuit optimization
51
52
    Parameters
53
    ----------
54
    parameters : `~gammapy.modeling.Parameters`
55
        Parameters with starting values
56
    function : callable
57
        Likelihood function
58
    store_trace : bool
59
        Store trace of the fit
60
    **kwargs : dict
61
        Options passed to `iminuit.Minuit` constructor. If there is an entry 'migrad_opts', those options
62
        will be passed to `iminuit.Minuit.migrad()`.
63
64
    Returns
65
    -------
66
    result : (factors, info, optimizer)
67
        Tuple containing the best fit factors, some info and the optimizer instance.
68
    """
69
    migrad_opts = kwargs.pop("migrad_opts", {})
70
71
    minuit, minuit_func = setup_iminuit(
72
        parameters=parameters, function=function, store_trace=store_trace, **kwargs
73
    )
74
75
    minuit.migrad(**migrad_opts)
76
77
    factors = minuit.values
78
    info = {
79
        "success": minuit.valid,
80
        "nfev": minuit.nfcn,
81
        "message": _get_message(minuit, parameters),
82
        "trace": minuit_func.trace,
83
    }
84
    optimizer = minuit
85
86
    return factors, info, optimizer
87
88
89
def covariance_iminuit(parameters, function, **kwargs):
90
    minuit = kwargs["minuit"]
91
92
    if minuit is None:
93
        minuit, _ = setup_iminuit(
94
            parameters=parameters, function=function, store_trace=False, **kwargs
95
        )
96
        minuit.hesse()
97
98
    message, success = "Hesse terminated successfully.", True
99
100
    try:
101
        covariance_factors = np.array(minuit.covariance)
102
    except (TypeError, RuntimeError):
103
        N = len(minuit.values)
104
        covariance_factors = np.nan * np.ones((N, N))
105
        message, success = "Hesse failed", False
106
107
    return covariance_factors, {"success": success, "message": message}
108
109
110
def confidence_iminuit(parameters, function, parameter, reoptimize, sigma, **kwargs):
111
    # TODO: this is ugly - design something better for translating to MINUIT parameter names.
112
    if not reoptimize:
113
        log.warning("Reoptimize = False ignored for iminuit backend")
114
115
    minuit, minuit_func = setup_iminuit(
116
        parameters=parameters, function=function, store_trace=False, **kwargs
117
    )
118
    migrad_opts = kwargs.get("migrad_opts", {})
119
    minuit.migrad(**migrad_opts)
120
121
    # Maybe a wrapper class MinuitParameters?
122
    parameter = parameters[parameter]
123
    idx = parameters.free_parameters.index(parameter)
124
    var = _make_parname(idx, parameter)
125
126
    message = "Minos terminated successfully."
127
    cl = 2 * norm.cdf(sigma) - 1
128
129
    try:
130
        minuit.minos(var, cl=cl, ncall=None)
131
        info = minuit.merrors[var]
132
    except (AttributeError, RuntimeError) as error:
133
        return {
134
            "success": False,
135
            "message": str(error),
136
            "errp": np.nan,
137
            "errn": np.nan,
138
            "nfev": 0,
139
        }
140
141
    return {
142
        "success": info.is_valid,
143
        "message": message,
144
        "errp": info.upper,
145
        "errn": -info.lower,
146
        "nfev": info.nfcn,
147
    }
148
149
150
def contour_iminuit(parameters, function, x, y, numpoints, sigma, **kwargs):
151
    minuit, minuit_func = setup_iminuit(
152
        parameters=parameters, function=function, store_trace=False, **kwargs
153
    )
154
    minuit.migrad()
155
156
    par_x = parameters[x]
157
    idx_x = parameters.free_parameters.index(par_x)
158
    x = _make_parname(idx_x, par_x)
159
160
    par_y = parameters[y]
161
    idx_y = parameters.free_parameters.index(par_y)
162
    y = _make_parname(idx_y, par_y)
163
164
    cl = chi2(2).cdf(sigma ** 2)
165
    contour = minuit.mncontour(x=x, y=y, size=numpoints, cl=cl)
166
    # TODO: add try and except to get the success
167
    return {
168
        "success": True,
169
        "x": contour[:, 0],
170
        "y": contour[:, 1],
171
    }
172
173
174
# this code is copied from https://github.com/iminuit/iminuit/blob/master/iminuit/_minimize.py#L95
175
def _get_message(m, parameters):
176
    message = "Optimization terminated successfully."
177
    success = m.accurate
178
    success &= np.all(np.isfinite([par.value for par in parameters]))
179
    if not success:
180
        message = "Optimization failed."
181
        fmin = m.fmin
182
        if fmin.has_reached_call_limit:
183
            message += " Call limit was reached."
184
        if fmin.is_above_max_edm:
185
            message += " Estimated distance to minimum too large."
186
    return message
187
188
189
def _make_parnames(parameters):
190
    return [_make_parname(idx, par) for idx, par in enumerate(parameters)]
191
192
193
def _make_parname(idx, par):
194
    return f"par_{idx:03d}_{par.name}"
195
196
197
def make_minuit_par_kwargs(parameters):
198
    """Create *Parameter Keyword Arguments* for the `Minuit` constructor.
199
200
    See: http://iminuit.readthedocs.io/en/latest/api.html#iminuit.Minuit
201
    """
202
    names = _make_parnames(parameters.free_parameters)
203
    pars, errors, limits = {}, {}, {}
204
205
    for name, par in zip(names, parameters.free_parameters):
206
        pars[name] = par.factor
207
208
        min_ = None if np.isnan(par.factor_min) else par.factor_min
209
        max_ = None if np.isnan(par.factor_max) else par.factor_max
210
        limits[name] = (min_, max_)
211
212
        if par.error == 0 or np.isnan(par.error):
213
            error = 1
214
        else:
215
            error = par.error / par.scale
216
        errors[name] = error
217
218
    return pars, errors, limits
219