Completed
Push — master ( 69b31d...2b6f25 )
by Andy
35s
created

fit_pixel_fixed_scatter()   F

Complexity

Conditions 11

Size

Total Lines 127

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 11
c 3
b 1
f 0
dl 0
loc 127
rs 3.1764

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like fit_pixel_fixed_scatter() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
Fitting functions for use in The Cannon.
6
"""
7
8
from __future__ import (division, print_function, absolute_import,
9
                        unicode_literals)
10
11
__all__ = ["fit_spectrum", "fit_pixel_fixed_scatter", "fit_theta_by_linalg",
12
    "chi_sq", "L1Norm_variation"]
13
14
import logging
15
import numpy as np
16
import scipy.optimize as op
17
from time import time
18
19
logger = logging.getLogger(__name__)
20
21
22
def fit_spectrum(flux, ivar, initial_labels, vectorizer, theta, s2, fiducials,
23
    scales, dispersion=None, **kwargs):
24
    """
25
    Fit a single spectrum by least-squared fitting.
26
    
27
    :param flux:
28
        The normalized flux values.
29
30
    :param ivar:
31
        The inverse variance array for the normalized fluxes.
32
33
    :param initial_labels:
34
        The point(s) to initialize optimization from.
35
36
    :param vectorizer:
37
        The vectorizer to use when fitting the data.
38
39
    :param theta:
40
        The theta coefficients (spectral derivatives) of the trained model.
41
42
    :param s2:
43
        The pixel scatter (s^2) array for each pixel.
44
45
    :param dispersion: [optional]
46
        The dispersion (e.g., wavelength) points for the normalized fluxes.
47
48
    :returns:
49
        A three-length tuple containing: the optimized labels, the covariance
50
        matrix, and metadata associated with the optimization.
51
    """
52
53
    adjusted_ivar = ivar/(1. + ivar * s2)
54
    
55
    # Exclude non-finite points (e.g., points with zero inverse variance 
56
    # or non-finite flux values, but the latter shouldn't exist anyway).
57
    use = np.isfinite(flux * adjusted_ivar) * (adjusted_ivar > 0)
58
    L = len(vectorizer.label_names)
59
60
    if not np.any(use):
61
        logger.warn("No information in spectrum!")
62
        return (np.nan * np.ones(L), None, {
63
                "fail_message": "Pixels contained no information"})
64
65
    # Splice the arrays we will use most.
66
    flux = flux[use]
67
    weights = np.sqrt(adjusted_ivar[use]) # --> 1.0 / sigma
68
    use_theta = theta[use]
69
70
    initial_labels = np.atleast_2d(initial_labels)
71
72
    # Check the vectorizer whether it has a derivative built in.
73
    Dfun = kwargs.pop("Dfun", True)
74
    if Dfun not in (None, False):
75
        try:
76
            vectorizer.get_label_vector_derivative(initial_labels[0])
77
        
78
        except NotImplementedError:
79
            Dfun = None
80
            logger.warn("No label vector derivatives available in {}!".format(
81
                vectorizer))
82
            
83
        except:
84
            logger.exception("Exception raised when trying to calculate the "\
85
                             "label vector derivative at the fiducial values:")
86
            raise
87
88
        else:
89
            # Use the label vector derivative.
90
            Dfun = lambda parameters: weights * np.dot(use_theta,
91
                vectorizer.get_label_vector_derivative(parameters)).T
92
93
    else:
94
        Dfun = None
95
96
    def func(parameters):
97
        return np.dot(use_theta, vectorizer(parameters))[:, 0]
98
        
99
    def residuals(parameters):
100
        return weights * (func(parameters) - flux)
101
102
    kwds = {
103
        "func": residuals,
104
        "Dfun": Dfun,
105
        "col_deriv": True,
106
107
        # These get passed through to leastsq:
108
        "ftol": 7./3 - 4./3 - 1, # Machine precision.
109
        "xtol": 7./3 - 4./3 - 1, # Machine precision.
110
        "gtol": 0.0,
111
        "maxfev": 100000, # MAGIC
112
        "epsfcn": None,
113
        "factor": 1.0, 
114
    }
115
116
    # Only update the keywords with things that op.curve_fit/op.leastsq expects.
117
    for key in set(kwargs).intersection(kwds):
118
        kwds[key] = kwargs[key]
119
120
    results = []
121
    for x0 in initial_labels:
122
123
        try:
124
            op_labels, cov, meta, mesg, ier = op.leastsq(
125
                x0=(x0 - fiducials)/scales, full_output=True, **kwds)
126
            
127
        except RuntimeError:
128
            logger.exception("Exception in fitting from {}".format(x0))
129
            continue
130
131
        meta.update(
132
            dict(x0=x0, chi_sq=np.sum(meta["fvec"]**2), ier=ier, mesg=mesg))
133
        results.append((op_labels, cov, meta))
134
135
    if len(results) == 0:
136
        logger.warn("No results found!")
137
        return (np.nan * np.ones(L), None, dict(fail_message="No results found"))
138
139
    best_result_index = np.nanargmin([m["chi_sq"] for (o, c, m) in results])
140
    op_labels, cov, meta = results[best_result_index]
141
142
    # De-scale the optimized labels.
143
    meta["model_flux"] = func(op_labels)
144
    op_labels = op_labels * scales + fiducials
145
146
    if np.allclose(op_labels, meta["x0"]):
147
        logger.warn(
148
            "Discarding optimized result because it is exactly the same as the "
149
            "initial value!")
150
151
        # We are in dire straits. We should not trust the result.
152
        op_labels *= np.nan
153
        meta["fail_message"] = "Optimized result same as initial value."
154
155
    if not np.any(np.isfinite(cov)):
156
        logger.warn("Non-finite covariance matrix returned!")
157
    
158
    # Save additional information.
159
    meta.update({
160
        "method": "leastsq",
161
        "label_names": vectorizer.label_names,
162
        "best_result_index": best_result_index,
163
        "derivatives_used": Dfun is not None,
164
        "snr": np.nanmedian(flux * weights),
165
        "r_chi_sq": meta["chi_sq"]/(use.sum() - L - 1),
166
    })
167
    for key in ("ftol", "xtol", "gtol", "maxfev", "factor", "epsfcn"):
168
        meta[key] = kwds[key]
169
    
170
    return (op_labels, cov, meta)
171
172
173
174
def fit_theta_by_linalg(flux, ivar, s2, design_matrix):
175
    """
176
    Fit theta coefficients to a set of normalized fluxes for a single pixel.
177
178
    :param flux:
179
        The normalized fluxes for a single pixel (across many stars).
180
181
    :param ivar:
182
        The inverse variance of the normalized flux values for a single pixel
183
        across many stars.
184
185
    :param s2:
186
        The noise residual (squared scatter term) to adopt in the pixel.
187
188
    :param design_matrix:
189
        The model design matrix.
190
191
    :returns:
192
        The label vector coefficients for the pixel, and the inverse variance 
193
        matrix.
194
    """
195
196
    adjusted_ivar = ivar/(1. + ivar * s2)
197
    CiA = design_matrix * np.tile(adjusted_ivar, (design_matrix.shape[1], 1)).T
198
    try:
199
        ATCiAinv = np.linalg.inv(np.dot(design_matrix.T, CiA))
200
    except np.linalg.linalg.LinAlgError:
201
        N = design_matrix.shape[1]
202
        return (np.hstack([1, np.zeros(N - 1)]), np.inf * np.eye(N))
203
204
    ATY = np.dot(design_matrix.T, flux * adjusted_ivar)
205
    theta = np.dot(ATCiAinv, ATY)
206
207
    return (theta, ATCiAinv)
208
209
210
211
# TODO: This logic should probably go somewhere else.
212
213
    
214
def chi_sq(theta, design_matrix, flux, ivar, axis=None, gradient=True):
215
    """
216
    Calculate the chi-squared difference between the spectral model and flux.
217
218
    :param theta:
219
        The theta coefficients.
220
221
    :param design_matrix:
222
        The model design matrix.
223
224
    :param flux:
225
        The normalized flux values.
226
227
    :param ivar:
228
        The inverse variances of the normalized flux values.
229
230
    :param axis: [optional]
231
        The axis to sum the chi-squared values across.
232
233
    :param gradient: [optional]
234
        Return the chi-squared value and its derivatives (Jacobian).
235
236
    :returns:
237
        The chi-squared difference between the spectral model and flux, and
238
        optionally, the Jacobian.
239
    """
240
    residuals = np.dot(theta, design_matrix.T) - flux
241
242
    ivar_residuals = ivar * residuals
243
    f = np.sum(ivar_residuals * residuals, axis=axis)
244
    if not gradient:
245
        return f
246
247
    g = 2.0 * np.dot(design_matrix.T, ivar_residuals)
248
    return (f, g)
249
250
251
def L1Norm_variation(theta):
252
    """
253
    Return the L1 norm of theta (except the first entry) and its derivative.
254
255
    :param theta:
256
        An array of finite values.
257
258
    :returns:
259
        A two-length tuple containing: the L1 norm of theta (except the first 
260
        entry), and the derivative of the L1 norm of theta.
261
    """
262
263
    return (np.sum(np.abs(theta[1:])), np.hstack([0.0, np.sign(theta[1:])]))
264
265
266
def _pixel_objective_function_fixed_scatter(theta, design_matrix, flux, ivar, 
267
    regularization, gradient=True):
268
    """
269
    The objective function for a single regularized pixel with fixed scatter.
270
271
    :param theta:
272
        The spectral coefficients.
273
274
    :param normalized_flux:
275
        The normalized flux values for a single pixel across many stars.
276
277
    :param adjusted_ivar:
278
        The adjusted inverse variance of the normalized flux values for a single 
279
        pixel across many stars. This adjusted inverse variance array should
280
        already have the scatter included.
281
282
    :param regularization:
283
        The regularization term to scale the L1 norm of theta with.
284
285
    :param design_matrix:
286
        The design matrix for the model.
287
288
    :param gradient: [optional]
289
        Also return the analytic derivative of the objective function.
290
    """
291
292
    if gradient:
293
        csq, d_csq = chi_sq(theta, design_matrix, flux, ivar, gradient=True)
294
        L1, d_L1 = L1Norm_variation(theta)
295
296
        f = csq + regularization * L1
297
        g = d_csq + regularization * d_L1
298
        
299
        return (f, g)
300
301
    else:
302
        csq = chi_sq(theta, design_matrix, flux, ivar, gradient=False)
303
        L1, d_L1 = L1Norm_variation(theta)
304
305
        return csq + regularization * L1
306
307
308
def _scatter_objective_function(scatter, residuals_squared, ivar):
309
    adjusted_ivar = ivar/(1.0 + ivar * scatter**2)
310
    chi_sq = residuals_squared * adjusted_ivar
311
    return (np.mean(chi_sq) - 1.0)**2
312
    
313
314
def fit_pixel_fixed_scatter(flux, ivar, initial_thetas, design_matrix, 
315
    regularization, censoring_mask, **kwargs): 
316
    """
317
    Fit theta coefficients and noise residual for a single pixel, using
318
    an initially fixed scatter value.
319
320
    :param flux:
321
        The normalized flux values.
322
323
    :param ivar:
324
        The inverse variance array for the normalized fluxes.
325
326
    :param initial_thetas:
327
        A list of initial theta values to start from, and their source. For
328
        example: `[(theta_0, "guess"), (theta_1, "old_theta")]
329
330
    :param design_matrix:
331
        The model design matrix.
332
333
    :param regularization:
334
        The regularization strength to apply during optimization (Lambda).
335
336
    :param censoring_mask:
337
        A per-label censoring mask for each pixel.
338
339
    :keyword op_method:
340
        The optimization method to use. Valid options are: `l_bfgs_b`, `powell`.
341
342
    :keyword op_kwds:
343
        A dictionary of arguments that will be provided to the optimizer.
344
345
    :returns:
346
        The optimized theta coefficients, the noise residual `s2`, and
347
        metadata related to the optimization process.
348
    """ 
349
350
    if np.sum(ivar) < 1.0 * ivar.size: # MAGIC
351
        metadata = dict(message="No pixel information.", op_time=0.0)
352
        fiducial = np.hstack([1.0, np.zeros(design_matrix.shape[1] - 1)])
353
        return (fiducial, np.inf, metadata) # MAGIC
354
355
    # Determine if any theta coefficients will be censored.
356
    censored_theta = ~np.any(np.isfinite(design_matrix), axis=0)
357
    # Make the design matrix safe to use.
358
    design_matrix[:, censored_theta] = 0
359
360
    feval = []
361
    for initial_theta, initial_theta_source in initial_thetas:
362
        feval.append(_pixel_objective_function_fixed_scatter(
363
            initial_theta, design_matrix, flux, ivar, regularization, False))
364
365
    initial_theta, initial_theta_source = initial_thetas[np.nanargmin(feval)]
366
367
    op_kwds = dict(x0=initial_theta,
368
        args=(design_matrix, flux, ivar, regularization), 
369
        disp=False, maxfun=np.inf, maxiter=np.inf)
370
371
    if any(censored_theta):
372
        # If the initial_theta is the same size as the censored_mask, but different
373
        # to the design_matrix, then we need to censor the initial theta so that we
374
        # don't bother solving for those parameters.
375
        op_kwds["x0"] = np.array(op_kwds["x0"])[~censored_theta]
376
        op_kwds["args"] = (design_matrix[:, ~censored_theta], flux, ivar,
377
            regularization)
378
        
379
    # Allow either l_bfgs_b or powell
380
    t_init = time()
381
    op_method = kwargs.get("op_method", "l_bfgs_b").lower()
382
    if op_method == "l_bfgs_b":
383
384
        op_kwds.update(m=design_matrix.shape[1], factr=10.0, pgtol=1e-6)
385
        op_kwds.update(kwargs.get("op_kwds", {}))
386
387
        # If op_bounds are given and we are censoring some theta terms, then we
388
        # will need to adjust which op_bounds we provide.
389
        if "bounds" in op_kwds and any(censored_theta):
390
            op_kwds["bounds"] = \
391
                [b for b, is_censored in zip(op_kwds["bounds"], censored_theta) \
392
                                      if not is_censored]
393
 
394
        op_params, fopt, metadata = op.fmin_l_bfgs_b(
395
            _pixel_objective_function_fixed_scatter, 
396
            fprime=None, approx_grad=None, **op_kwds)
397
398
        metadata.update(dict(fopt=fopt))
399
400
    elif op_method == "powell":
401
        
402
        op_kwds.update(xtol=1e-6, ftol=1e-6)
403
        op_kwds.update(kwargs.get("op_kwds", {}))
404
405
        # Set 'False' in args so that we don't return the gradient, because fmin
406
        # doesn't want it.
407
        args = list(op_kwds["args"])
408
        args.append(False)
409
        op_kwds["args"] = tuple(args)
410
411
        t_init = time()
412
413
        op_params, fopt, direc, n_iter, n_funcs, warnflag = op.fmin_powell(
414
            _pixel_objective_function_fixed_scatter,full_output=True, **op_kwds)
415
416
        metadata = dict(fopt=fopt, direc=direc, n_iter=n_iter, n_funcs=n_funcs,
417
            warnflag=warnflag)
418
419
    else:
420
        raise ValueError("unknown optimization method '{}' -- "
421
                         "powell or l_bfgs_b are available".format(op_method))
422
423
    # Additional metadata common to both optimizers.
424
    metadata.update(dict(op_method=op_method, op_time=time() - t_init,
425
        initial_theta=initial_theta, initial_theta_source=initial_theta_source))
426
427
    # De-censor the optimized parameters.
428
    if any(censored_theta):
429
        theta = np.zeros(censored_theta.size)
430
        theta[~censored_theta] = op_params
431
432
    else:
433
        theta = op_params
434
435
    # Fit the scatter.
436
    residuals_squared = (flux - np.dot(theta, design_matrix.T))**2
437
    scatter = op.fmin(_scatter_objective_function, 0.0,
438
        args=(residuals_squared, ivar), disp=False)
439
440
    return (theta, scatter**2, metadata)
441