Issues (587)

mutis/signal.py (11 issues)

1
# Licensed under a 3-clause BSD style license - see LICENSE
2
"""Synthetic generation of light curves."""
3
4
import logging
5
6
import matplotlib.pyplot as plt
7
import numpy as np
8
import scipy.optimize as scipy_optimize
9
import scipy.signal as scipy_signal
10
import scipy.special as scipy_special
11
import scipy.stats as scipy_stats
12
from matplotlib.offsetbox import AnchoredText
13
14
from mutis.lib.signal import *
15
16
__all__ = ["Signal"]
17
18
log = logging.getLogger(__name__)
19
20
21
class Signal:
22
    """Analysis and generation of a signal.
23
24
    Class for a generic signal.
25
26
    Attributes
27
    ----------
28
    times : :class:`numpy.ndarray` or :class:`pandas.Series`
29
        Values of the time axis.
30
    values : :class:`numpy.ndarray` or :class:`pandas.Series`
31
        Values of the signal axis.
32
    dvalues : :class:`numpy.ndarray` or :class:`pandas.Series`
33
        Uncertainties on the values of the signal axis.
34
35
    Other Attributes
36
    ----------------
37
38
    fgen : :class:`str`
39
        Method to generate the synthetic signal.
40
41
    synth : :class:`numpy.ndarray`
42
        Synthetic signals generated with `fgen`.
43
44
    OU_theta : :class:`float`
45
        (For OU method) Parameter theta of the OU process.
46
    OU_mu : :class:`float`
47
        (For OU method) Parameter mu of the OU process.
48
    OU_sigma : :class:`float`
49
        (For OU method) Parameter sigma of the OU process.
50
    """
51
52
    def __init__(self, times, values, dvalues=None, fgen=None):
53
        
54
        self.times = np.array(times, dtype=np.float64)
55
        self.values = np.array(values, dtype=np.float64)
56
        self.fgen = fgen
57
58
        self.dvalues = np.array(dvalues, dtype=np.float64) if dvalues is not None else None
59
        self.synth = None
60
61
        # TODO make attributes below specific of OU method / not the entire class
62
        self.OU_theta = None
63
        self.OU_mu = None
64
        self.OU_sigma = None
65
66
    def plot(self, ax=None):
67
        if ax is None:
68
            ax = plt.gca()
69
        return ax.plot(self.times, self.values, ".-", lw=1, alpha=0.7)
70
71
    def gen_synth(self, samples):
72
        """Generate synthethic light curves for this signal.
73
        
74
        Generated curves are saved in `self.synth`. The times used are the same
75
        of this signal.
76
        
77
        Parameters
78
        ----------
79
        samples: :int: number of samples to generate.
80
        
81
        """
82
83
        self.synth = np.empty((samples, self.times.size))
84
        for n in range(samples):
85
            self.synth[n] = fgen_wrapper(fgen=self.fgen, t=self.times, y=self.values,
86
                         fgen_params={'theta':self.OU_theta, 'mu':self.OU_mu, 'sigma':self.OU_sigma})
0 ignored issues
show
Wrong continued indentation (add 16 spaces).
Loading history...
87
88
89
    def OU_fit(self, bins=None, rang=None, a=1e-5, b=100):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (20/15).
Loading history...
90
        """Fit the signal to an OU stochastic process, using several statistical approaches.
91
92
        This function tries to fit the signal to an OU stochastic
93
        process using both basic curve fitting and the Maximum
94
        Likelihood Estimation method, and returns some plots of
95
        the signals and its properties, and the estimated parameters.
96
        """
97
98
        # TODO: make a generic fit method
99
        res = dict()
100
101
        y = self.values
102
        t = self.times
103
104
        dy = np.diff(y)
105
        dt = np.diff(t)
106
107
        # estimate sigma
108
        try:
109
            sigma_est = (np.nanmean(dy ** 2 / y[:-1] ** 2 / dt)) ** 0.5
110
            res["sigma_est"] = sigma_est
111
        except Exception as e:
0 ignored issues
show
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
112
            log.error(f"Could not estimate sigma: {e}")
0 ignored issues
show
Use lazy % formatting in logging functions
Loading history...
113
114
        # plot histogram
115
        if bins is None:
116
            bins = int(y.size ** 0.5 / 1.5)  # bins='auto'
117
        if rang is None:
118
            rang = (np.percentile(y, 0), np.percentile(y, 99))
119
120
        p, x = np.histogram(y, density=True, bins=bins, range=rang)  # bins='sqrt')
121
        x = (x + np.roll(x, -1))[:-1] / 2.0
122
123
        plt.subplots()
124
125
        plt.hist(y, density=True, alpha=0.75, bins=bins, range=rang)
126
        plt.plot(x, p, "r-", alpha=0.5)
127
128
        anchored_text = AnchoredText(
129
            f"mean    {np.mean(y):.2g} \n "
130
            f"median  {np.median(y):.2g} \n "
131
            f"mode    {scipy_stats.mode(y)[0]:.2g} \n "
132
            f"std     {np.std(y):.2g} \n "
133
            f"var     {np.var(y):.2g}",
134
            loc="upper right",
135
        )
136
        plt.gca().add_artist(anchored_text)
137
138
        # curve_fit
139
        try:
140
            popt, pcov = scipy_optimize.curve_fit(f=self.pdf, xdata=x, ydata=p)
141
            # print('curve_fit: (l, mu)')
142
            # print('popt: ')
143
            # print(popt)
144
            # print('pcov: ')
145
            # print(np.sqrt(np.diag(pcov)))
146
            x_c = np.linspace(1e-5, 1.1 * np.max(x), 1000)
147
            plt.plot(x_c, self.pdf(x_c, *popt), "k-", label="curve_fit", alpha=0.8)
148
            res["curve_fit"] = (popt, np.sqrt(np.diag(pcov)))
149
        except Exception as e:
0 ignored issues
show
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
150
            log.error(f"Some error fitting with curve_fit {e}")
0 ignored issues
show
Use lazy % formatting in logging functions
Loading history...
151
152
        # fit pdf with MLE
153
        # TODO: place outside as a helper class
154
        #       mu here is different than self.mu
155
        class OU(scipy_stats.rv_continuous):
156
            def _pdf(self, x, l, mu):
157
                return (
158
                    (l * mu) ** (1 + l)
159
                    / scipy_special.gamma(1 + l)
160
                    * np.exp(-l * mu / x)
161
                    / x ** (l + 2)
162
                )
163
164
        try:
165
            fit = OU(a=a, b=np.percentile(y, b)).fit(y, 1, 1, floc=0, fscale=1)
166
            # print('MLE fit: (l, mu)')
167
            # print(fit)
168
            x_c = np.linspace(0, 1.1 * np.max(x), 1000)
169
            plt.plot(x_c, self.pdf(x_c, fit[0], fit[1]), "k-.", label="MLE", alpha=0.8)
170
            res["MLE_fit"] = fit[:-2]
171
        except Exception as e:
0 ignored issues
show
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
172
            log.error(f"Some error fitting with MLE {e}")
0 ignored issues
show
Use lazy % formatting in logging functions
Loading history...
173
174
        plt.legend(loc="lower right")
175
        # plt.show()
176
177
        # estimate theta (from curve_fit)
178
        try:
179
            res["th_est1"] = fit[0] * sigma_est ** 2 / 2
180
        except NameError as e:
181
            res["th_est1"] = None
182
183
        # estimate theta (from MLE)
184
        try:
185
            res["th_est2"] = popt[0] * sigma_est ** 2 / 2
186
        except NameError as e:
187
            res["th_est2"] = None
188
189
        return res
190
        
191
        
192
    def check_gen(self, fgen=None, fgen_params=None, fpsd="lombscargle", **axes):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (16/15).
Loading history...
193
        """Check the generation of synthetic signals.
194
195
        This function checks the generation of a synthetic light curve.
196
        
197
        Parameters
198
        ----------
199
        fgen: :str:
200
            A valid fgen method name.
201
        fgen_params: :dict:
202
            Parameters for fgen (e.g. for fgen='lc_gen_ou', a dict containing
203
            values for theta, mu and sigma).
204
205
        Returns
206
        -------
207
        axes: :tuple:
208
            Tuple of three axes, on which:
209
            - The first plot show both the original signal and the synthetic one.
210
            - The second plot shows the histogram of the values taken by both signals.
211
            - The third plot shows their PSD.
212
        """
213
        
214
        t, y = self.times, self.values
215
216
        y2 = fgen_wrapper(fgen=fgen, t=t, y=y, fgen_params=fgen_params)
217
        
218
        print(f"Original vs Synthethic:",
0 ignored issues
show
Using an f-string that does not have any interpolated variables
Loading history...
219
              f"mean: {np.mean(y)} / {np.mean(y2)}",
220
              f"std: {np.std(y)} / {np.std(y2)}", sep='\n')
221
        
222
        if ("ax1" not in axes) or ("ax2" not in axes) or ("ax3" not in axes):
223
            fig, (axes["ax1"], axes["ax2"], axes["ax3"]) = plt.subplots(
0 ignored issues
show
The variable fig seems to be unused.
Loading history...
224
                nrows=1, ncols=3, figsize=(20, 4)
225
            )
226
227
        # plot the two signals
228
        axes["ax1"].plot(t, y, "b-", label="orig", lw=0.5, alpha=0.8)
229
230
        # ax1p = ax1.twinx()
231
        axes["ax1"].plot(t, y2, "r-", label="gen", lw=0.5, alpha=0.8)
232
        axes["ax1"].set_title("light curves")
233
234
        # plot their histogram
235
        bins = "auto"  # bins = int(y.size**0.5/1.5) #
236
        rang = (np.percentile(y, 0), np.percentile(y, 99))
237
        axes["ax2"].hist(y, density=True, color="b", alpha=0.4, bins=bins, range=rang)
238
239
        # ax2p = ax2.twinx()
240
        bins = "auto"  # bins = int(y.size**0.5/1.5) #
241
        rang = (np.percentile(y2, 0), np.percentile(y2, 99))
242
        axes["ax2"].hist(y2, density=True, color="r", alpha=0.4, bins=bins, range=rang)
243
244
        axes["ax2"].set_title("pdf")
245
246
        # plot their PSD
247
        if fpsd == "lombscargle":
248
            k = np.linspace(1e-3, self.values.size / 2, self.values.size // 2)
249
            freqs = k / 2 / np.pi
250
251
            pxx = scipy_signal.lombscargle(t, y, freqs, normalize=True)
252
            axes["ax3"].plot(freqs, pxx, "b-", lw=1, alpha=0.5)
253
254
            pxx2 = scipy_signal.lombscargle(t, y2, freqs, normalize=True)
255
            axes["ax3"].plot(freqs, pxx2, "r-", lw=1, alpha=0.5)
256
257
            axes["ax3"].set_xscale("log")
258
            # ax3.set_yscale('log')
259
        else:
260
            axes["ax3"].psd(y, color="b", lw=1, alpha=0.5)
261
262
            ax3p = axes["ax3"].twinx()
263
            ax3p.psd(y2, color="r", lw=1, alpha=0.5)
264
265
        axes["ax3"].set_title("PSD")
266
267
        return axes
268
    
269
    
270
        
271
    @staticmethod
272
    def pdf(xx, ll, mu):
273
        """Helper func to fit pdf as a curve."""
274
        return (
275
            (ll * mu) ** (1 + ll)
276
            / scipy_special.gamma(1 + ll)
277
            * np.exp(-ll * mu / xx)
278
            / xx ** (ll + 2)
279
        )
280