Completed
Push — master ( 4fc32f...027495 )
by Andy
01:22
created

RegularizedCannonModel.regularization()   F

Complexity

Conditions 9

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 0
loc 3
rs 3
c 2
b 0
f 0
cc 9
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
A regularized (compressed sensing) version of The Cannon.
6
"""
7
8
from __future__ import (division, print_function, absolute_import,
9
                        unicode_literals)
10
11
__all__ = ["RegularizedCannonModel"]
12
13
import logging
14
import numpy as np
15
import scipy.optimize as op
16
17
from . import (cannon, model, utils)
18
19
logger = logging.getLogger(__name__)
20
21
22
class RegularizedCannonModel(cannon.CannonModel):
23
    """
24
    A L1-regularized edition of The Cannon model for the estimation of arbitrary
25
    stellar labels.
26
27
    :param labels:
28
        A table with columns as labels, and stars as rows.
29
30
    :type labels:
31
        :class:`~astropy.table.Table` or numpy structured array
32
33
    :param fluxes:
34
        An array of fluxes for stars in the training set, given as shape
35
        `(num_stars, num_pixels)`. The `num_stars` should match the number of
36
        rows in `labels`.
37
38
    :type fluxes:
39
        :class:`np.ndarray`
40
41
    :param flux_uncertainties:
42
        An array of 1-sigma flux uncertainties for stars in the training set,
43
        The shape of the `flux_uncertainties` should match `fluxes`. 
44
45
    :type flux_uncertainties:
46
        :class:`np.ndarray`
47
48
    :param dispersion: [optional]
49
        The dispersion values corresponding to the given pixels. If provided, 
50
        this should have length `num_pixels`.
51
52
    :param live_dangerously: [optional]
53
        If enabled then no checks will be made on the label names, prohibiting
54
        the user to input human-readable forms of the label vector.
55
    """
56
57
    _descriptive_attributes = ["_label_vector", "_regularization"]
58
    
59
    def __init__(self, *args, **kwargs):
60
        super(RegularizedCannonModel, self).__init__(*args, **kwargs)
61
62
63
    @property
64
    def regularization(self):
65
        return self._regularization
66
67
68
    @regularization.setter
69
    def regularization(self, regularization):
70
        if regularization is None:
71
            self._regularization = None
72
            return None
73
        
74
        # Can be positive float, or positive values for all pixels.
75
        try:
76
            regularization = float(regularization)
77
        except (TypeError, ValueError):
78
            regularization = np.array(regularization).flatten()
79
80
            if regularization.size != len(self.dispersion):
81
                raise ValueError("regularization must be a positive value or "
82
                                 "an array of positive values for each pixel "
83
                                 "({0} != {1})".format(
84
                                    regularization.size,
85
                                    len(self.dispersion)))
86
87
            if any(0 > regularization) \
88
            or not np.all(np.isfinite(regularization)):
89
                raise ValueError("regularization terms must be "
90
                                 "positive and finite")
91
        else:
92
            if 0 > regularization or not np.isfinite(regularization):
93
                raise ValueError("regularization term must be "
94
                                 "positive and finite")
95
            regularization = np.ones_like(self.dispersion) * regularization
96
        self._regularization = regularization
97
        return None
98
99
100
    # windows to specify zero coefficients for a given label (or terms comprising)
101
    # that label.
102
103
104
    @model.requires_model_description
105
    def train(self, **kwargs):
106
        """
107
        Train the model based on the training set and the description of the
108
        label vector, and enforce regularization.
109
        """
110
        
111
        # Initialise the scatter and coefficient arrays.
112
        N_pixels = len(self.dispersion)
113
        scatter = np.nan * np.ones(N_pixels)
114
        label_vector_array = self.label_vector_array
115
        theta = np.nan * np.ones((N_pixels, label_vector_array.shape[0]))
116
117
        # Details for the progressbar.
118
        pb_kwds = {
119
            "message": "Training regularized Cannon model from {0} stars with "\
120
                       "{1} pixels each".format(
121
                           len(self.training_labels), N_pixels),
122
            "size": 100 if kwargs.pop("progressbar", True) else -1
123
        }
124
        
125
        if self.pool is None:
126
            for pixel in utils.progressbar(range(N_pixels), **pb_kwds):
127
                theta[pixel, :], scatter[pixel] = _fit_pixel(
128
                    self.training_fluxes[:, pixel], 
129
                    self.training_flux_uncertainties[:, pixel],
130
                    label_vector_array, self.regularization[pixel],
131
                    **kwargs)
132
133
        else:
134
            # Not as nice as just mapping, but necessary for a progress bar.
135
            process = { pixel: self.pool.apply_async(_fit_pixel, args=(
136
                    self.training_fluxes[:, pixel], 
137
                    self.training_flux_uncertainties[:, pixel],
138
                    label_vector_array, self.regularization[pixel]
139
                ), kwds=kwargs) \
140
                for pixel in range(N_pixels) }
141
142
            for pixel, proc in utils.progressbar(process.items(), **pb_kwds):
143
                theta[pixel, :], scatter[pixel] = proc.get()
144
145
        self.coefficients, self.scatter = (theta, scatter)
146
        self._trained = True
147
148
        return (theta, scatter)
149
150
151
    def conservative_cross_validation(self, **kwargs):
152
        """
153
        Perform conservative cross-validation using cyclic training, validation,
154
        and test subsets of the labelled data.
155
        """
156
157
        """
158
        Assign integer probabilities for each star.
159
160
        0: for choosing the regularization term.
161
        1-8 inclusive: training set.
162
        9: for prediction.
163
        """
164
165
        subset_index = np.random.randint(0, 10, size=len(self.training_labels))
166
167
        # Start with an initial value of the regularization.
168
169
        raise NotImplementedError
170
171
172
173 View Code Duplication
def _fit_pixel(fluxes, flux_uncertainties, label_vector_array, 
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
174
    regularization, **kwargs):
175
    """
176
    Return the optimal label vector coefficients and scatter for a pixel, given
177
    the fluxes, uncertainties, and the label vector array.
178
179
    :param fluxes:
180
        The fluxes for the given pixel, from all stars.
181
182
    :param flux_uncertainties:
183
        The 1-sigma flux uncertainties for the given pixel, from all stars.
184
185
    :param label_vector_array:
186
        The label vector array. This should have shape `(N_stars, N_terms + 1)`.
187
188
    :param regularization:
189
        The regularization term.
190
191
    :returns:
192
        The optimised label vector coefficients and scatter for this pixel.
193
    """
194
195
    _ = kwargs.get("max_uncertainty", 1)
196
    failed_response = (np.nan * np.ones(label_vector_array.shape[0]), _)
197
    if np.all(flux_uncertainties >= _):
198
        return failed_response
199
200
    # Get an initial guess of the scatter.
201
    scatter = np.var(fluxes) - np.median(flux_uncertainties)**2
202
    scatter = np.sqrt(scatter) if scatter >= 0 else np.std(fluxes)
203
    
204
    # Optimise the scatter, and at each scatter value we will calculate the
205
    # optimal vector coefficients.
206
    op_scatter, fopt, direc, n_iter, n_funcs, warnflag = op.fmin_powell(
207
        _pixel_scatter_nll, scatter,
208
        args=(fluxes, flux_uncertainties, label_vector_array, regularization),
209
        disp=False, full_output=True)
210
211
    if warnflag > 0:
212
        logger.warning("Warning: {}".format([
213
            "Maximum number of function evaluations made during optimisation.",
214
            "Maximum number of iterations made during optimisation."
215
            ][warnflag - 1]))
216
217
    # Calculate the coefficients at the optimal scatter value.
218
    # Note that if we can't solve for the coefficients, we should just set them
219
    # as zero and send back a giant variance.
220
    try:
221
        coefficients, ATCiAinv, variance = cannon._fit_coefficients(
222
            fluxes, flux_uncertainties, op_scatter, label_vector_array)
223
224
    except np.linalg.linalg.LinAlgError:
225
        logger.exception("Failed to calculate coefficients")
226
        if kwargs.get("debug", False): raise
227
228
        return failed_response
229
230
    else:
231
        return (coefficients, op_scatter)
232
233
234
def _pixel_scatter_nll(scatter, fluxes, flux_uncertainties, label_vector_array,
235
    regularization, **kwargs):
236
    """
237
    Return the negative log-likelihood for the scatter in a single pixel.
238
239
    :param scatter:
240
        The model scatter in the pixel.
241
242
    :param fluxes:
243
        The fluxes for a given pixel (in many stars).
244
245
    :param flux_uncertainties:
246
        The 1-sigma uncertainties in the fluxes for a given pixel. This should
247
        have the same shape as `fluxes`.
248
249
    :param label_vector_array:
250
        The label vector array for each star, for the given pixel.
251
252
    :param regularization:
253
        A regularization term.
254
255
    :returns:
256
        The log-likelihood of the log scatter, given the fluxes and the label
257
        vector array.
258
259
    :raises np.linalg.linalg.LinAlgError:
260
        If there was an error in inverting a matrix, and `debug` is set to True.
261
    """
262
263
    if 0 > scatter:
264
        return np.inf
265
266
    try:
267
        # Calculate the coefficients for the given level of scatter.
268
        theta, ATCiAinv, variance = cannon._fit_coefficients(
269
            fluxes, flux_uncertainties, scatter, label_vector_array)
270
271
    except np.linalg.linalg.LinAlgError:
272
        if kwargs.get("debug", False): raise
273
        return np.inf
274
275
    model = np.dot(theta, label_vector_array)
276
    
277
    return np.sum((fluxes - model)**2 / variance) \
278
        +  np.sum(np.log(variance)) \
279
        +  regularization * np.abs(theta).sum()
280