Completed
Push — master ( 83b528...b1912a )
by Andy
58s
created

AnniesLasso.CannonModel.function()   A

Complexity

Conditions 2

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 4
rs 10
cc 2
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
A pedestrian version of The Cannon.
6
"""
7
8
from __future__ import (division, print_function, absolute_import,
9
                        unicode_literals)
10
11
__all__ = ["CannonModel"]
12
13
import logging
14
import numpy as np
15
import scipy.optimize as op
16
17
from . import (model, utils)
18
19
logger = logging.getLogger(__name__)
20
21
22
class CannonModel(model.BaseCannonModel):
23
    """
24
    A generalised Cannon model for the estimation of arbitrary stellar labels.
25
26
    :param labels:
27
        A table with columns as labels, and stars as rows.
28
29
    :type labels:
30
        :class:`~astropy.table.Table` or numpy structured array
31
32
    :param fluxes:
33
        An array of fluxes for stars in the training set, given as shape
34
        `(num_stars, num_pixels)`. The `num_stars` should match the number of
35
        rows in `labels`.
36
37
    :type fluxes:
38
        :class:`np.ndarray`
39
40
    :param flux_uncertainties:
41
        An array of 1-sigma flux uncertainties for stars in the training set,
42
        The shape of the `flux_uncertainties` should match `fluxes`. 
43
44
    :type flux_uncertainties:
45
        :class:`np.ndarray`
46
47
    :param dispersion: [optional]
48
        The dispersion values corresponding to the given pixels. If provided, 
49
        this should have length `num_pixels`.
50
51
    :param live_dangerously: [optional]
52
        If enabled then no checks will be made on the label names, prohibiting
53
        the user to input human-readable forms of the label vector.
54
    """
55
56
    _descriptive_attributes = ["_label_vector"]
57
    _trained_attributes = ["_coefficients", "_scatter", "_pivots"]
58
    _data_attributes = \
59
        ["training_labels", "training_fluxes", "training_flux_uncertainties"]
60
    
61
    def __init__(self, *args, **kwargs):
62
        super(CannonModel, self).__init__(*args, **kwargs)
63
64
65
    @model.requires_model_description
66
    def train(self, **kwargs):
67
        """
68
        Train the model based on the training set and the description of the
69
        label vector.
70
        """
71
        
72
        # Initialise the scatter and coefficient arrays.
73
        N_px = len(self.dispersion)
74
        scatter = np.nan * np.ones(N_px)
75
        label_vector_array = self.label_vector_array
76
        theta = np.nan * np.ones((N_px, label_vector_array.shape[0]))
77
78
        # Details for the progressbar.
79
        pb_kwds = {
80
            "message": "Training Cannon model from {0} stars with {1} pixels "
81
                       "each".format(len(self.training_labels), N_px),
82
            "size": 100 if kwargs.pop("progressbar", True) else -1
83
        }
84
        
85
        if self.pool is None:
86
            for pixel in utils.progressbar(range(N_px), **pb_kwds):
87
                theta[pixel, :], scatter[pixel] = _fit_pixel(
88
                    self.training_fluxes[:, pixel], 
89
                    self.training_flux_uncertainties[:, pixel],
90
                    label_vector_array, **kwargs)
91
92
        else:
93
            # Not as nice as just mapping, but necessary for a progress bar.
94
            process = { pixel: self.pool.apply_async(
95
                    _fit_pixel,
96
                    args=(
97
                        self.training_fluxes[:, pixel], 
98
                        self.training_flux_uncertainties[:, pixel],
99
                        label_vector_array
100
                    ),
101
                    kwds=kwargs) \
102
                for pixel in range(N_px) }
103
104
            for pixel, proc in utils.progressbar(process.items(), **pb_kwds):
105
                theta[pixel, :], scatter[pixel] = proc.get()
106
107
        # Save the trained state.
108
        self.coefficients, self.scatter = (theta, scatter)
109
        self._trained = True
110
111
        return (theta, scatter)
112
113
114
    @model.requires_training_wheels
115
    def predict(self, labels=None, **labels_as_kwargs):
116
        """
117
        Predict spectra from the trained model, given the labels.
118
119
        :param labels: [optional]
120
            The labels required for the trained model. This should be a N-length
121
            list matching the number of unique terms in the model, in the order
122
            given by `self.labels`. Alternatively, labels can be explicitly 
123
            given as keyword arguments.
124
        """
125
126
        labels = self._format_input_labels(labels, **labels_as_kwargs)
127
128
        return np.dot(self.coefficients, model._build_label_vector_rows(
129
            self.label_vector, labels, dict(zip(self.labels, self.pivots))
130
            )).flatten()
131
132
133
    @model.requires_training_wheels
134
    def fit(self, fluxes, flux_uncertainties, **kwargs):
135
        """
136
        Solve the labels for given pixel fluxes and uncertainties.
137
138
        :param fluxes:
139
            The normalised fluxes. These should be on the same dispersion scale
140
            as the trained data.
141
142
        :param flux_uncertainties:
143
            The 1-sigma uncertainties in the fluxes. This should have the same
144
            shape as `fluxes`.
145
146
        :returns:
147
            The labels and covariance matrix.
148
        """
149
150
        label_indices = self._get_lowest_order_label_indices()
151
        fluxes, flux_uncertainties = map(np.array, (fluxes, flux_uncertainties))
152
153
        # TODO: Consider parallelising this, which would mean factoring
154
        # _fit out of the model class, which gets messy.
155
        # Since solving for labels is not a big bottleneck (yet), let's leave
156
        # this.
157
158
        full_output = kwargs.pop("full_output", False)
159
        if fluxes.ndim == 1:
160
            labels, covariance = \
161
                self._fit(fluxes, flux_uncertainties, label_indices, **kwargs)
162
        else:
163
            N_stars, N_labels = (fluxes.shape[0], len(self.labels))
164
            labels = np.empty((N_stars, N_labels), dtype=float)
165
            covariance = np.empty((N_stars, N_labels, N_labels), dtype=float)
166
167
            for i, (f, u) in enumerate(zip(fluxes, flux_uncertainties)):
168
                labels[i, :], covariance[i, :] = \
169
                    self._fit(f, u, label_indices, **kwargs)
170
171
        if full_output:
172
            return (labels, covariance)
173
        return labels
174
175
176
    def _fit(self, fluxes, flux_uncertainties, label_indices, **kwargs):
177
        """
178
        Solve the labels for given pixel fluxes and uncertainties
179
        for a single star.
180
181
        :param fluxes:
182
            The normalised fluxes. These should be on the same dispersion scale
183
            as the trained data.
184
185
        :param flux_uncertainties:
186
            The 1-sigma uncertainties in the fluxes. This should have the same
187
            shape as `fluxes`.
188
189
        :returns:
190
            The labels and covariance matrix.
191
        """
192
193
        # Check which pixels to use, then just use those.
194
        use = (flux_uncertainties < kwargs.get("max_uncertainty", 1)) \
195
            * np.isfinite(self.coefficients[:, 0] * fluxes * flux_uncertainties)
196
197
        fluxes = fluxes.copy()[use]
198
        flux_uncertainties = flux_uncertainties.copy()[use]
199
        scatter, coefficients = self.scatter[use], self.coefficients[use]
200
201
        Cinv = 1.0 / (scatter**2 + flux_uncertainties**2)
202
        A = np.dot(coefficients.T, Cinv[:, None] * coefficients)
203
        B = np.dot(coefficients.T, Cinv * fluxes)
204
        theta_p0 = np.linalg.solve(A, B)
205
206
        # Need to match the initial theta coefficients back to label values.
207
        # (Maybe this should use some general non-linear simultaneous solver?)
208
        initial = {}
209
        for index in label_indices:
210
            if index is None: continue
211
            label, order = self.label_vector[index][0]
212
            # The +1 index offset is because the first theta is a scaling.
213
            value = abs(theta_p0[1 + index])**(1./order)
214
            if not np.isfinite(value): continue
215
            initial[label] = value
216
217
        # There could be some coefficients that are only used in cross-terms.
218
        # We could solve for them, or just take them as zero (i.e., near the
219
        # pivot point of the data set).
220
        missing = set(self.labels).difference(initial)
221
        initial.update({ label: 0.0 for label in missing })
222
223
        # Create and test the generating function.
224
        def function(coeffs, *labels):
225
            return np.dot(coeffs, 
226
                model._build_label_vector_rows(self.label_vector, 
227
                    { label: [v] for label, v in zip(self.labels, labels) }
228
                )).flatten()
229
230
        # Solve for the parameters.
231
        kwds = {
232
            "p0": np.array([initial[label] for label in self.labels]),
233
            "maxfev": 10000,
234
            "sigma": 1.0/np.sqrt(Cinv),
235
            "absolute_sigma": True
236
        }
237
        kwds.update(kwargs)
238
        labels_opt, cov = op.curve_fit(function, coefficients, fluxes, **kwds)
239
240
        # Apply any necessary pivots to put these back to real space.   
241
        labels_opt += self.pivots
242
        
243
        return (labels_opt, cov)
244
245
246
def _fit_pixel(fluxes, flux_uncertainties, label_vector_array, **kwargs):
247
    """
248
    Return the optimal label vector coefficients and scatter for a pixel, given
249
    the fluxes, uncertainties, and the label vector array.
250
251
    :param fluxes:
252
        The fluxes for the given pixel, from all stars.
253
254
    :param flux_uncertainties:
255
        The 1-sigma flux uncertainties for the given pixel, from all stars.
256
257
    :param label_vector_array:
258
        The label vector array. This should have shape `(N_stars, N_terms + 1)`.
259
260
    :returns:
261
        The optimised label vector coefficients and scatter for this pixel.
262
    """
263
264
    _ = kwargs.get("max_uncertainty", 1)
265
    failed_response = (np.nan * np.ones(label_vector_array.shape[0]), _)
266
    if np.all(flux_uncertainties > _):
267
        return failed_response
268
269
    # Get an initial guess of the scatter.
270
    scatter = np.var(fluxes) - np.median(flux_uncertainties)**2
271
    scatter = np.sqrt(scatter) if scatter >= 0 else np.std(fluxes)
272
    
273
    # Optimise the scatter, and at each scatter value we will calculate the
274
    # optimal vector coefficients.
275
    op_scatter, fopt, direc, n_iter, n_funcs, warnflag = op.fmin_powell(
276
        _pixel_scatter_nll, scatter,
277
        args=(fluxes, flux_uncertainties, label_vector_array),
278
        disp=False, full_output=True)
279
280
    if warnflag > 0:
281
        logger.warning("Warning: {}".format([
282
            "Maximum number of function evaluations made during optimisation.",
283
            "Maximum number of iterations made during optimisation."
284
            ][warnflag - 1]))
285
286
    # Calculate the coefficients at the optimal scatter value.
287
    # Note that if we can't solve for the coefficients, we should just set them
288
    # as zero and send back a giant variance.
289
    try:
290
        coefficients, ATCiAinv, variance = _fit_coefficients(
291
            fluxes, flux_uncertainties, op_scatter, label_vector_array)
292
293
    except np.linalg.linalg.LinAlgError:
294
        logger.exception("Failed to calculate coefficients")
295
        if kwargs.get("debug", False): raise
296
297
        return failed_response
298
299
    else:
300
        return (coefficients, op_scatter)
301
302
303
def _pixel_scatter_nll(scatter, fluxes, flux_uncertainties, label_vector_array,
304
    **kwargs):
305
    """
306
    Return the negative log-likelihood for the scatter in a single pixel.
307
308
    :param scatter:
309
        The model scatter in the pixel.
310
311
    :param fluxes:
312
        The fluxes for a given pixel (in many stars).
313
314
    :param flux_uncertainties:
315
        The 1-sigma uncertainties in the fluxes for a given pixel. This should
316
        have the same shape as `fluxes`.
317
318
    :param label_vector_array:
319
        The label vector array for each star, for the given pixel.
320
321
    :returns:
322
        The log-likelihood of the log scatter, given the fluxes and the label
323
        vector array.
324
325
    :raises np.linalg.linalg.LinAlgError:
326
        If there was an error in inverting a matrix, and `debug` is set to True.
327
    """
328
329
    if 0 > scatter:
330
        return np.inf
331
332
    try:
333
        # Calculate the coefficients for the given level of scatter.
334
        theta, ATCiAinv, variance = _fit_coefficients(
335
            fluxes, flux_uncertainties, scatter, label_vector_array)
336
337
    except np.linalg.linalg.LinAlgError:
338
        if kwargs.get("debug", False): raise
339
        return np.inf
340
341
    model = np.dot(theta, label_vector_array)
342
343
    return 0.5 * np.sum((fluxes - model)**2 / variance) \
344
        +  0.5 * np.sum(np.log(variance))
345
346
347
def _fit_coefficients(fluxes, flux_uncertainties, scatter, label_vector_array):
348
    """
349
    Fit model coefficients and scatter to a given set of normalised fluxes for a
350
    single pixel.
351
352
    :param fluxes:
353
        The normalised fluxes for a single pixel (in many stars).
354
355
    :param flux_uncertainties:
356
        The 1-sigma uncertainties in normalised fluxes. This should have the
357
        same shape as `fluxes`.
358
359
    :param label_vector_array:
360
        The label vector array for each pixel.
361
362
    :returns:
363
        The label vector coefficients for the pixel, the inverse variance matrix
364
        and the total pixel variance.
365
    """
366
367
    variance = flux_uncertainties**2 + scatter**2
368
    CiA = label_vector_array.T * \
369
        np.tile(1./variance, (label_vector_array.shape[0], 1)).T
370
    ATCiAinv = np.linalg.inv(np.dot(label_vector_array, CiA))
371
372
    ATY = np.dot(label_vector_array, fluxes/variance)
373
    theta = np.dot(ATCiAinv, ATY)
374
375
    return (theta, ATCiAinv, variance)
376