Completed
Push — master ( 4fc32f...027495 )
by Andy
01:22
created

CannonModel   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 226
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 21
dl 0
loc 226
rs 10
c 1
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A predict() 0 16 1
C _fit() 0 68 8
B fit() 0 41 4
C train() 0 51 7
A function() 0 4 2
A __init__() 0 2 1
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
A pedestrian version of The Cannon.
6
"""
7
8
from __future__ import (division, print_function, absolute_import,
9
                        unicode_literals)
10
11
__all__ = ["CannonModel"]
12
13
import logging
14
import numpy as np
15
import scipy.optimize as op
16
17
from . import (model, utils)
18
19
logger = logging.getLogger(__name__)
20
21
22
class CannonModel(model.BaseCannonModel):
23
    """
24
    A generalised Cannon model for the estimation of arbitrary stellar labels.
25
26
    :param labels:
27
        A table with columns as labels, and stars as rows.
28
29
    :type labels:
30
        :class:`~astropy.table.Table` or numpy structured array
31
32
    :param fluxes:
33
        An array of fluxes for stars in the training set, given as shape
34
        `(num_stars, num_pixels)`. The `num_stars` should match the number of
35
        rows in `labels`.
36
37
    :type fluxes:
38
        :class:`np.ndarray`
39
40
    :param flux_uncertainties:
41
        An array of 1-sigma flux uncertainties for stars in the training set,
42
        The shape of the `flux_uncertainties` should match `fluxes`. 
43
44
    :type flux_uncertainties:
45
        :class:`np.ndarray`
46
47
    :param dispersion: [optional]
48
        The dispersion values corresponding to the given pixels. If provided, 
49
        this should have length `num_pixels`.
50
51
    :param live_dangerously: [optional]
52
        If enabled then no checks will be made on the label names, prohibiting
53
        the user to input human-readable forms of the label vector.
54
    """
55
56
    _descriptive_attributes = ["_label_vector"]
57
    _trained_attributes = ["_coefficients", "_scatter", "_pivots"]
58
    _data_attributes = \
59
        ["training_labels", "training_fluxes", "training_flux_uncertainties"]
60
    
61
    def __init__(self, *args, **kwargs):
62
        super(CannonModel, self).__init__(*args, **kwargs)
63
64
65
    @model.requires_model_description
66
    def train(self, **kwargs):
67
        """
68
        Train the model based on the training set and the description of the
69
        label vector.
70
        """
71
        
72
        # Initialise the scatter and coefficient arrays.
73
        N_px = len(self.dispersion)
74
        scatter = np.nan * np.ones(N_px)
75
        label_vector_array = self.label_vector_array
76
        theta = np.nan * np.ones((N_px, label_vector_array.shape[0]))
77
78
        # Details for the progressbar.
79
        pb_kwds = {
80
            "message": "Training Cannon model from {0} stars with {1} pixels "
81
                       "each".format(len(self.training_labels), N_px),
82
            "size": 100 if kwargs.pop("progressbar", True) else -1
83
        }
84
        
85
        if self.pool is None:
86
            for pixel in utils.progressbar(range(N_px), **pb_kwds):
87
                theta[pixel, :], scatter[pixel] = _fit_pixel(
88
                    self.training_fluxes[:, pixel], 
89
                    self.training_flux_uncertainties[:, pixel],
90
                    label_vector_array, **kwargs)
91
92
        else:
93
            # Not as nice as just mapping, but necessary for a progress bar.
94
            process = { pixel: self.pool.apply_async(
95
                    _fit_pixel,
96
                    args=(
97
                        self.training_fluxes[:, pixel], 
98
                        self.training_flux_uncertainties[:, pixel],
99
                        label_vector_array
100
                    ),
101
                    kwds=kwargs) \
102
                for pixel in range(N_px) }
103
104
            for pixel, proc in utils.progressbar(process.items(), **pb_kwds):
105
                theta[pixel, :], scatter[pixel] = proc.get()
106
107
        if np.std(scatter) == 0:
108
            logger.warning("All pixels show the same level of variance!"
109
                           " (Something probably went very, very wrong)")
110
111
        # Save the trained state.            
112
        self.coefficients, self.scatter = (theta, scatter)
113
        self._trained = True
114
115
        return (theta, scatter)
116
117
118
    @model.requires_training_wheels
119
    def predict(self, labels=None, **labels_as_kwargs):
120
        """
121
        Predict spectra from the trained model, given the labels.
122
123
        :param labels: [optional]
124
            The labels required for the trained model. This should be a N-length
125
            list matching the number of unique terms in the model, in the order
126
            given by `self.labels`. Alternatively, labels can be explicitly 
127
            given as keyword arguments.
128
        """
129
130
        labels = self._format_input_labels(labels, **labels_as_kwargs)
131
132
        return np.dot(self.coefficients, model._build_label_vector_rows(
133
            self.label_vector, labels, dict(zip(self.labels, self.pivots))
134
            )).flatten()
135
136
137
    @model.requires_training_wheels
138
    def fit(self, fluxes, flux_uncertainties, **kwargs):
139
        """
140
        Solve the labels for given pixel fluxes and uncertainties.
141
142
        :param fluxes:
143
            The normalised fluxes. These should be on the same dispersion scale
144
            as the trained data.
145
146
        :param flux_uncertainties:
147
            The 1-sigma uncertainties in the fluxes. This should have the same
148
            shape as `fluxes`.
149
150
        :returns:
151
            The labels and covariance matrix.
152
        """
153
154
        label_indices = self._get_lowest_order_label_indices()
155
        fluxes, flux_uncertainties = map(np.array, (fluxes, flux_uncertainties))
156
157
        # TODO: Consider parallelising this, which would mean factoring
158
        # _fit out of the model class, which gets messy.
159
        # Since solving for labels is not a big bottleneck (yet), let's leave
160
        # this.
161
162
        full_output = kwargs.pop("full_output", False)
163
        if fluxes.ndim == 1:
164
            labels, covariance = \
165
                self._fit(fluxes, flux_uncertainties, label_indices, **kwargs)
166
        else:
167
            N_stars, N_labels = (fluxes.shape[0], len(self.labels))
168
            labels = np.empty((N_stars, N_labels), dtype=float)
169
            covariance = np.empty((N_stars, N_labels, N_labels), dtype=float)
170
171
            for i, (f, u) in enumerate(zip(fluxes, flux_uncertainties)):
172
                labels[i, :], covariance[i, :] = \
173
                    self._fit(f, u, label_indices, **kwargs)
174
175
        if full_output:
176
            return (labels, covariance)
177
        return labels
178
179
180
    def _fit(self, fluxes, flux_uncertainties, label_indices, **kwargs):
181
        """
182
        Solve the labels for given pixel fluxes and uncertainties
183
        for a single star.
184
185
        :param fluxes:
186
            The normalised fluxes. These should be on the same dispersion scale
187
            as the trained data.
188
189
        :param flux_uncertainties:
190
            The 1-sigma uncertainties in the fluxes. This should have the same
191
            shape as `fluxes`.
192
193
        :returns:
194
            The labels and covariance matrix.
195
        """
196
197
        # Check which pixels to use, then just use those.
198
        use = (flux_uncertainties < kwargs.get("max_uncertainty", 1)) \
199
            * np.isfinite(self.coefficients[:, 0] * fluxes * flux_uncertainties)
200
201
        fluxes = fluxes.copy()[use]
202
        flux_uncertainties = flux_uncertainties.copy()[use]
203
        scatter, coefficients = self.scatter[use], self.coefficients[use]
204
205
        Cinv = 1.0 / (scatter**2 + flux_uncertainties**2)
206
        A = np.dot(coefficients.T, Cinv[:, None] * coefficients)
207
        B = np.dot(coefficients.T, Cinv * fluxes)
208
        theta_p0 = np.linalg.solve(A, B)
209
210
        # Need to match the initial theta coefficients back to label values.
211
        # (Maybe this should use some general non-linear simultaneous solver?)
212
        initial = {}
213
        for index in label_indices:
214
            if index is None: continue
215
            label, order = self.label_vector[index][0]
216
            # The +1 index offset is because the first theta is a scaling.
217
            value = abs(theta_p0[1 + index])**(1./order)
218
            if not np.isfinite(value): continue
219
            initial[label] = value
220
221
        # There could be some coefficients that are only used in cross-terms.
222
        # We could solve for them, or just take them as zero (i.e., near the
223
        # pivot point of the data set).
224
        missing = set(self.labels).difference(initial)
225
        initial.update({ label: 0.0 for label in missing })
226
227
        # Create and test the generating function.
228
        def function(coeffs, *labels):
229
            return np.dot(coeffs, 
230
                model._build_label_vector_rows(self.label_vector, 
231
                    { label: [v] for label, v in zip(self.labels, labels) }
232
                )).flatten()
233
234
        # Solve for the parameters.
235
        kwds = {
236
            "p0": np.array([initial[label] for label in self.labels]),
237
            "maxfev": 10000,
238
            "sigma": 1.0/np.sqrt(Cinv),
239
            "absolute_sigma": True
240
        }
241
        kwds.update(kwargs)
242
        labels_opt, cov = op.curve_fit(function, coefficients, fluxes, **kwds)
243
244
        # Apply any necessary pivots to put these back to real space.   
245
        labels_opt += self.pivots
246
        
247
        return (labels_opt, cov)
248
249
250 View Code Duplication
def _fit_pixel(fluxes, flux_uncertainties, label_vector_array, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
251
    """
252
    Return the optimal label vector coefficients and scatter for a pixel, given
253
    the fluxes, uncertainties, and the label vector array.
254
255
    :param fluxes:
256
        The fluxes for the given pixel, from all stars.
257
258
    :param flux_uncertainties:
259
        The 1-sigma flux uncertainties for the given pixel, from all stars.
260
261
    :param label_vector_array:
262
        The label vector array. This should have shape `(N_stars, N_terms + 1)`.
263
264
    :returns:
265
        The optimised label vector coefficients and scatter for this pixel.
266
    """
267
268
    _ = kwargs.get("max_uncertainty", 1)
269
    failed_response = (np.nan * np.ones(label_vector_array.shape[0]), _)
270
    if np.all(flux_uncertainties >= _):
271
        return failed_response
272
273
    # Get an initial guess of the scatter.
274
    scatter = np.var(fluxes) - np.median(flux_uncertainties)**2
275
    scatter = np.sqrt(scatter) if scatter >= 0 else np.std(fluxes)
276
    
277
    # Optimise the scatter, and at each scatter value we will calculate the
278
    # optimal vector coefficients.
279
    op_scatter, fopt, direc, n_iter, n_funcs, warnflag = op.fmin_powell(
280
        _pixel_scatter_nll, scatter,
281
        args=(fluxes, flux_uncertainties, label_vector_array),
282
        disp=False, full_output=True)
283
284
    if warnflag > 0:
285
        logger.warning("Warning: {}".format([
286
            "Maximum number of function evaluations made during optimisation.",
287
            "Maximum number of iterations made during optimisation."
288
            ][warnflag - 1]))
289
290
    # Calculate the coefficients at the optimal scatter value.
291
    # Note that if we can't solve for the coefficients, we should just set them
292
    # as zero and send back a giant variance.
293
    try:
294
        coefficients, ATCiAinv, variance = _fit_coefficients(
295
            fluxes, flux_uncertainties, op_scatter, label_vector_array)
296
297
    except np.linalg.linalg.LinAlgError:
298
        logger.exception("Failed to calculate coefficients")
299
        if kwargs.get("debug", False): raise
300
301
        return failed_response
302
303
    else:
304
        return (coefficients, op_scatter)
305
306
307
def _pixel_scatter_nll(scatter, fluxes, flux_uncertainties, label_vector_array,
308
    **kwargs):
309
    """
310
    Return the negative log-likelihood for the scatter in a single pixel.
311
312
    :param scatter:
313
        The model scatter in the pixel.
314
315
    :param fluxes:
316
        The fluxes for a given pixel (in many stars).
317
318
    :param flux_uncertainties:
319
        The 1-sigma uncertainties in the fluxes for a given pixel. This should
320
        have the same shape as `fluxes`.
321
322
    :param label_vector_array:
323
        The label vector array for each star, for the given pixel.
324
325
    :returns:
326
        The log-likelihood of the log scatter, given the fluxes and the label
327
        vector array.
328
329
    :raises np.linalg.linalg.LinAlgError:
330
        If there was an error in inverting a matrix, and `debug` is set to True.
331
    """
332
333
    if 0 > scatter:
334
        return np.inf
335
336
    try:
337
        # Calculate the coefficients for the given level of scatter.
338
        theta, ATCiAinv, variance = _fit_coefficients(
339
            fluxes, flux_uncertainties, scatter, label_vector_array)
340
341
    except np.linalg.linalg.LinAlgError:
342
        if kwargs.get("debug", False): raise
343
        return np.inf
344
345
    model = np.dot(theta, label_vector_array)
346
347
    return 0.5 * np.sum((fluxes - model)**2 / variance) \
348
        +  0.5 * np.sum(np.log(variance))
349
350
351
def _fit_coefficients(fluxes, flux_uncertainties, scatter, label_vector_array):
352
    """
353
    Fit model coefficients and scatter to a given set of normalised fluxes for a
354
    single pixel.
355
356
    :param fluxes:
357
        The normalised fluxes for a single pixel (in many stars).
358
359
    :param flux_uncertainties:
360
        The 1-sigma uncertainties in normalised fluxes. This should have the
361
        same shape as `fluxes`.
362
363
    :param label_vector_array:
364
        The label vector array for each pixel.
365
366
    :returns:
367
        The label vector coefficients for the pixel, the inverse variance matrix
368
        and the total pixel variance.
369
    """
370
371
    variance = flux_uncertainties**2 + scatter**2
372
    CiA = label_vector_array.T * \
373
        np.tile(1./variance, (label_vector_array.shape[0], 1)).T
374
    ATCiAinv = np.linalg.inv(np.dot(label_vector_array, CiA))
375
376
    ATY = np.dot(label_vector_array, fluxes/variance)
377
    theta = np.dot(ATCiAinv, ATY)
378
379
    return (theta, ATCiAinv, variance)
380