Completed
Push — master ( 0c6327...53c124 )
by Andy
01:07
created

AnniesLasso.RegularizedCannonModel   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 144
Duplicated Lines 0 %
Metric Value
wmc 19
dl 0
loc 144
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A regularization() 0 3 1
A __init__() 0 2 1
B train() 0 45 6
A conservative_cross_validation() 0 19 1
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
A regularized (compressed sensing) version of The Cannon.
6
"""
7
8
from __future__ import (division, print_function, absolute_import,
9
                        unicode_literals)
10
11
__all__ = ["RegularizedCannonModel"]
12
13
import logging
14
import numpy as np
15
import scipy.optimize as op
16
17
from . import (cannon, model, utils)
18
19
logger = logging.getLogger(__name__)
20
21
22
class RegularizedCannonModel(cannon.CannonModel):
23
    """
24
    A L1-regularized edition of The Cannon model for the estimation of arbitrary
25
    stellar labels.
26
27
    :param labels:
28
        A table with columns as labels, and stars as rows.
29
30
    :type labels:
31
        :class:`~astropy.table.Table` or numpy structured array
32
33
    :param fluxes:
34
        An array of fluxes for stars in the training set, given as shape
35
        `(num_stars, num_pixels)`. The `num_stars` should match the number of
36
        rows in `labels`.
37
38
    :type fluxes:
39
        :class:`np.ndarray`
40
41
    :param flux_uncertainties:
42
        An array of 1-sigma flux uncertainties for stars in the training set,
43
        The shape of the `flux_uncertainties` should match `fluxes`. 
44
45
    :type flux_uncertainties:
46
        :class:`np.ndarray`
47
48
    :param dispersion: [optional]
49
        The dispersion values corresponding to the given pixels. If provided, 
50
        this should have length `num_pixels`.
51
52
    :param live_dangerously: [optional]
53
        If enabled then no checks will be made on the label names, prohibiting
54
        the user to input human-readable forms of the label vector.
55
    """
56
57
    _descriptive_attributes = ["_label_vector", "_regularization"]
58
    
59
    def __init__(self, *args, **kwargs):
60
        super(RegularizedCannonModel, self).__init__(*args, **kwargs)
61
62
63
    @property
64
    def regularization(self):
65
        return self._regularization
66
67
68
    @regularization.setter
69
    def regularization(self, regularization):
70
        if regularization is None:
71
            self._regularization = None
72
            return None
73
        
74
        # Can be positive float, or positive values for all pixels.
75
        try:
76
            regularization = float(regularization)
77
        except (TypeError, ValueError):
78
            regularization = np.array(regularization).flatten()
79
80
            if regularization.size != len(self.dispersion):
81
                raise ValueError("regularization must be a positive value or "
82
                                 "an array of positive values for each pixel "
83
                                 "({0} != {1})".format(
84
                                    regularization.size,
85
                                    len(self.dispersion)))
86
87
            if any(0 > regularization) \
88
            or not np.all(np.isfinite(regularization)):
89
                raise ValueError("regularization terms must be "
90
                                 "positive and finite")
91
        else:
92
            if 0 > regularization or not np.isfinite(regularization):
93
                raise ValueError("regularization term must be "
94
                                 "positive and finite")
95
            regularization = np.ones_like(self.dispersion) * regularization
96
        self._regularization = regularization
97
        return None
98
99
100
    @model.requires_model_description
101
    def train(self, **kwargs):
102
        """
103
        Train the model based on the training set and the description of the
104
        label vector, and enforce regularization.
105
        """
106
        
107
        # Initialise the scatter and coefficient arrays.
108
        N_pixels = len(self.dispersion)
109
        scatter = np.nan * np.ones(N_pixels)
110
        label_vector_array = self.label_vector_array
111
        theta = np.nan * np.ones((N_pixels, label_vector_array.shape[0]))
112
113
        # Details for the progressbar.
114
        pb_kwds = {
115
            "message": "Training regularized Cannon model from {0} stars with "\
116
                       "{1} pixels each".format(
117
                           len(self.training_labels), N_pixels),
118
            "size": 100 if kwargs.pop("progressbar", True) else -1
119
        }
120
        
121
        if self.pool is None:
122
            for pixel in utils.progressbar(range(N_pixels), **pb_kwds):
123
                theta[pixel, :], scatter[pixel] = _fit_pixel(
124
                    self.training_fluxes[:, pixel], 
125
                    self.training_flux_uncertainties[:, pixel],
126
                    label_vector_array, self.regularization[pixel],
127
                    **kwargs)
128
129
        else:
130
            # Not as nice as just mapping, but necessary for a progress bar.
131
            process = { pixel: self.pool.apply_async(_fit_pixel, args=(
132
                    self.training_fluxes[:, pixel], 
133
                    self.training_flux_uncertainties[:, pixel],
134
                    label_vector_array, self.regularization[pixel]
135
                ), kwds=kwargs) \
136
                for pixel in range(N_pixels) }
137
138
            for pixel, proc in utils.progressbar(process.items(), **pb_kwds):
139
                theta[pixel, :], scatter[pixel] = proc.get()
140
141
        self.coefficients, self.scatter = (theta, scatter)
142
        self._trained = True
143
144
        return (theta, scatter)
145
146
147
    def conservative_cross_validation(self, **kwargs):
148
        """
149
        Perform conservative cross-validation using cyclic training, validation,
150
        and test subsets of the labelled data.
151
        """
152
153
        """
154
        Assign integer probabilities for each star.
155
156
        0: for choosing the regularization term.
157
        1-8 inclusive: training set.
158
        9: for prediction.
159
        """
160
161
        subset_index = np.random.randint(0, 10, size=len(self.training_labels))
162
163
        # Start with an initial value of the regularization.
164
165
        raise NotImplementedError
166
167
168
169
def _fit_pixel(fluxes, flux_uncertainties, label_vector_array, 
170
    regularization, **kwargs):
171
    """
172
    Return the optimal label vector coefficients and scatter for a pixel, given
173
    the fluxes, uncertainties, and the label vector array.
174
175
    :param fluxes:
176
        The fluxes for the given pixel, from all stars.
177
178
    :param flux_uncertainties:
179
        The 1-sigma flux uncertainties for the given pixel, from all stars.
180
181
    :param label_vector_array:
182
        The label vector array. This should have shape `(N_stars, N_terms + 1)`.
183
184
    :param regularization:
185
        The regularization term.
186
187
    :returns:
188
        The optimised label vector coefficients and scatter for this pixel.
189
    """
190
191
    _ = kwargs.get("max_uncertainty", 1)
192
    failed_response = (np.nan * np.ones(label_vector_array.shape[0]), _)
193
    if np.all(flux_uncertainties > _):
194
        return failed_response
195
196
    # Get an initial guess of the scatter.
197
    scatter = np.var(fluxes) - np.median(flux_uncertainties)**2
198
    scatter = np.sqrt(scatter) if scatter >= 0 else np.std(fluxes)
199
    
200
    # Optimise the scatter, and at each scatter value we will calculate the
201
    # optimal vector coefficients.
202
    op_scatter, fopt, direc, n_iter, n_funcs, warnflag = op.fmin_powell(
203
        _pixel_scatter_nll, scatter,
204
        args=(fluxes, flux_uncertainties, label_vector_array, regularization),
205
        disp=False, full_output=True)
206
207
    if warnflag > 0:
208
        logger.warning("Warning: {}".format([
209
            "Maximum number of function evaluations made during optimisation.",
210
            "Maximum number of iterations made during optimisation."
211
            ][warnflag - 1]))
212
213
    # Calculate the coefficients at the optimal scatter value.
214
    # Note that if we can't solve for the coefficients, we should just set them
215
    # as zero and send back a giant variance.
216
    try:
217
        coefficients, ATCiAinv, variance = cannon._fit_coefficients(
218
            fluxes, flux_uncertainties, op_scatter, label_vector_array)
219
220
    except np.linalg.linalg.LinAlgError:
221
        logger.exception("Failed to calculate coefficients")
222
        if kwargs.get("debug", False): raise
223
224
        return failed_response
225
226
    else:
227
        return (coefficients, op_scatter)
228
229
230
def _pixel_scatter_nll(scatter, fluxes, flux_uncertainties, label_vector_array,
231
    regularization, **kwargs):
232
    """
233
    Return the negative log-likelihood for the scatter in a single pixel.
234
235
    :param scatter:
236
        The model scatter in the pixel.
237
238
    :param fluxes:
239
        The fluxes for a given pixel (in many stars).
240
241
    :param flux_uncertainties:
242
        The 1-sigma uncertainties in the fluxes for a given pixel. This should
243
        have the same shape as `fluxes`.
244
245
    :param label_vector_array:
246
        The label vector array for each star, for the given pixel.
247
248
    :param regularization:
249
        A regularization term.
250
251
    :returns:
252
        The log-likelihood of the log scatter, given the fluxes and the label
253
        vector array.
254
255
    :raises np.linalg.linalg.LinAlgError:
256
        If there was an error in inverting a matrix, and `debug` is set to True.
257
    """
258
259
    if 0 > scatter:
260
        return np.inf
261
262
    try:
263
        # Calculate the coefficients for the given level of scatter.
264
        theta, ATCiAinv, variance = cannon._fit_coefficients(
265
            fluxes, flux_uncertainties, scatter, label_vector_array)
266
267
    except np.linalg.linalg.LinAlgError:
268
        if kwargs.get("debug", False): raise
269
        return np.inf
270
271
    model = np.dot(theta, label_vector_array)
272
    
273
    return np.sum((fluxes - model)**2 / variance) \
274
        +  np.sum(np.log(variance)) \
275
        +  regularization * np.abs(theta).sum()
276