1 | # Licensed under a 3-clause BSD style license - see LICENSE |
||
2 | """Synthetic generation of light curves.""" |
||
3 | |||
4 | import logging |
||
5 | |||
6 | import matplotlib.pyplot as plt |
||
7 | import numpy as np |
||
8 | import scipy.optimize as scipy_optimize |
||
9 | import scipy.signal as scipy_signal |
||
10 | import scipy.special as scipy_special |
||
11 | import scipy.stats as scipy_stats |
||
12 | from matplotlib.offsetbox import AnchoredText |
||
13 | |||
14 | from mutis.lib.signal import * |
||
15 | |||
16 | __all__ = ["Signal"] |
||
17 | |||
18 | log = logging.getLogger(__name__) |
||
19 | |||
20 | |||
21 | class Signal: |
||
22 | """Analysis and generation of a signal. |
||
23 | |||
24 | Class for a generic signal. |
||
25 | |||
26 | Attributes |
||
27 | ---------- |
||
28 | times : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
29 | Values of the time axis. |
||
30 | values : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
31 | Values of the signal axis. |
||
32 | dvalues : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
33 | Uncertainties on the values of the signal axis. |
||
34 | |||
35 | Other Attributes |
||
36 | ---------------- |
||
37 | |||
38 | fgen : :class:`str` |
||
39 | Method to generate the synthetic signal. |
||
40 | |||
41 | synth : :class:`numpy.ndarray` |
||
42 | Synthetic signals generated with `fgen`. |
||
43 | |||
44 | OU_theta : :class:`float` |
||
45 | (For OU method) Parameter theta of the OU process. |
||
46 | OU_mu : :class:`float` |
||
47 | (For OU method) Parameter mu of the OU process. |
||
48 | OU_sigma : :class:`float` |
||
49 | (For OU method) Parameter sigma of the OU process. |
||
50 | """ |
||
51 | |||
52 | def __init__(self, times, values, dvalues=None, fgen=None): |
||
53 | |||
54 | self.times = np.array(times, dtype=np.float64) |
||
55 | self.values = np.array(values, dtype=np.float64) |
||
56 | self.fgen = fgen |
||
57 | |||
58 | self.dvalues = np.array(dvalues, dtype=np.float64) if dvalues is not None else None |
||
59 | self.synth = None |
||
60 | |||
61 | # TODO make attributes below specific of OU method / not the entire class |
||
62 | self.OU_theta = None |
||
63 | self.OU_mu = None |
||
64 | self.OU_sigma = None |
||
65 | |||
66 | def plot(self, ax=None): |
||
67 | if ax is None: |
||
68 | ax = plt.gca() |
||
69 | return ax.plot(self.times, self.values, ".-", lw=1, alpha=0.7) |
||
70 | |||
71 | def gen_synth(self, samples): |
||
72 | """Generate synthethic light curves for this signal. |
||
73 | |||
74 | Generated curves are saved in `self.synth`. The times used are the same |
||
75 | of this signal. |
||
76 | |||
77 | Parameters |
||
78 | ---------- |
||
79 | samples: :int: number of samples to generate. |
||
80 | |||
81 | """ |
||
82 | |||
83 | self.synth = np.empty((samples, self.times.size)) |
||
84 | for n in range(samples): |
||
85 | self.synth[n] = fgen_wrapper(fgen=self.fgen, t=self.times, y=self.values, |
||
86 | fgen_params={'theta':self.OU_theta, 'mu':self.OU_mu, 'sigma':self.OU_sigma}) |
||
0 ignored issues
–
show
Coding Style
introduced
by
![]() |
|||
87 | |||
88 | |||
89 | def OU_fit(self, bins=None, rang=None, a=1e-5, b=100): |
||
0 ignored issues
–
show
|
|||
90 | """Fit the signal to an OU stochastic process, using several statistical approaches. |
||
91 | |||
92 | This function tries to fit the signal to an OU stochastic |
||
93 | process using both basic curve fitting and the Maximum |
||
94 | Likelihood Estimation method, and returns some plots of |
||
95 | the signals and its properties, and the estimated parameters. |
||
96 | """ |
||
97 | |||
98 | # TODO: make a generic fit method |
||
99 | res = dict() |
||
100 | |||
101 | y = self.values |
||
102 | t = self.times |
||
103 | |||
104 | dy = np.diff(y) |
||
105 | dt = np.diff(t) |
||
106 | |||
107 | # estimate sigma |
||
108 | try: |
||
109 | sigma_est = (np.nanmean(dy ** 2 / y[:-1] ** 2 / dt)) ** 0.5 |
||
110 | res["sigma_est"] = sigma_est |
||
111 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
112 | log.error(f"Could not estimate sigma: {e}") |
||
0 ignored issues
–
show
|
|||
113 | |||
114 | # plot histogram |
||
115 | if bins is None: |
||
116 | bins = int(y.size ** 0.5 / 1.5) # bins='auto' |
||
117 | if rang is None: |
||
118 | rang = (np.percentile(y, 0), np.percentile(y, 99)) |
||
119 | |||
120 | p, x = np.histogram(y, density=True, bins=bins, range=rang) # bins='sqrt') |
||
121 | x = (x + np.roll(x, -1))[:-1] / 2.0 |
||
122 | |||
123 | plt.subplots() |
||
124 | |||
125 | plt.hist(y, density=True, alpha=0.75, bins=bins, range=rang) |
||
126 | plt.plot(x, p, "r-", alpha=0.5) |
||
127 | |||
128 | anchored_text = AnchoredText( |
||
129 | f"mean {np.mean(y):.2g} \n " |
||
130 | f"median {np.median(y):.2g} \n " |
||
131 | f"mode {scipy_stats.mode(y)[0]:.2g} \n " |
||
132 | f"std {np.std(y):.2g} \n " |
||
133 | f"var {np.var(y):.2g}", |
||
134 | loc="upper right", |
||
135 | ) |
||
136 | plt.gca().add_artist(anchored_text) |
||
137 | |||
138 | # curve_fit |
||
139 | try: |
||
140 | popt, pcov = scipy_optimize.curve_fit(f=self.pdf, xdata=x, ydata=p) |
||
141 | # print('curve_fit: (l, mu)') |
||
142 | # print('popt: ') |
||
143 | # print(popt) |
||
144 | # print('pcov: ') |
||
145 | # print(np.sqrt(np.diag(pcov))) |
||
146 | x_c = np.linspace(1e-5, 1.1 * np.max(x), 1000) |
||
147 | plt.plot(x_c, self.pdf(x_c, *popt), "k-", label="curve_fit", alpha=0.8) |
||
148 | res["curve_fit"] = (popt, np.sqrt(np.diag(pcov))) |
||
149 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
150 | log.error(f"Some error fitting with curve_fit {e}") |
||
0 ignored issues
–
show
|
|||
151 | |||
152 | # fit pdf with MLE |
||
153 | # TODO: place outside as a helper class |
||
154 | # mu here is different than self.mu |
||
155 | class OU(scipy_stats.rv_continuous): |
||
156 | def _pdf(self, x, l, mu): |
||
157 | return ( |
||
158 | (l * mu) ** (1 + l) |
||
159 | / scipy_special.gamma(1 + l) |
||
160 | * np.exp(-l * mu / x) |
||
161 | / x ** (l + 2) |
||
162 | ) |
||
163 | |||
164 | try: |
||
165 | fit = OU(a=a, b=np.percentile(y, b)).fit(y, 1, 1, floc=0, fscale=1) |
||
166 | # print('MLE fit: (l, mu)') |
||
167 | # print(fit) |
||
168 | x_c = np.linspace(0, 1.1 * np.max(x), 1000) |
||
169 | plt.plot(x_c, self.pdf(x_c, fit[0], fit[1]), "k-.", label="MLE", alpha=0.8) |
||
170 | res["MLE_fit"] = fit[:-2] |
||
171 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
172 | log.error(f"Some error fitting with MLE {e}") |
||
0 ignored issues
–
show
|
|||
173 | |||
174 | plt.legend(loc="lower right") |
||
175 | # plt.show() |
||
176 | |||
177 | # estimate theta (from curve_fit) |
||
178 | try: |
||
179 | res["th_est1"] = fit[0] * sigma_est ** 2 / 2 |
||
180 | except NameError as e: |
||
181 | res["th_est1"] = None |
||
182 | |||
183 | # estimate theta (from MLE) |
||
184 | try: |
||
185 | res["th_est2"] = popt[0] * sigma_est ** 2 / 2 |
||
186 | except NameError as e: |
||
187 | res["th_est2"] = None |
||
188 | |||
189 | return res |
||
190 | |||
191 | |||
192 | def check_gen(self, fgen=None, fgen_params=None, fpsd="lombscargle", **axes): |
||
0 ignored issues
–
show
|
|||
193 | """Check the generation of synthetic signals. |
||
194 | |||
195 | This function checks the generation of a synthetic light curve. |
||
196 | |||
197 | Parameters |
||
198 | ---------- |
||
199 | fgen: :str: |
||
200 | A valid fgen method name. |
||
201 | fgen_params: :dict: |
||
202 | Parameters for fgen (e.g. for fgen='lc_gen_ou', a dict containing |
||
203 | values for theta, mu and sigma). |
||
204 | |||
205 | Returns |
||
206 | ------- |
||
207 | axes: :tuple: |
||
208 | Tuple of three axes, on which: |
||
209 | - The first plot show both the original signal and the synthetic one. |
||
210 | - The second plot shows the histogram of the values taken by both signals. |
||
211 | - The third plot shows their PSD. |
||
212 | """ |
||
213 | |||
214 | t, y = self.times, self.values |
||
215 | |||
216 | y2 = fgen_wrapper(fgen=fgen, t=t, y=y, fgen_params=fgen_params) |
||
217 | |||
218 | print(f"Original vs Synthethic:", |
||
0 ignored issues
–
show
|
|||
219 | f"mean: {np.mean(y)} / {np.mean(y2)}", |
||
220 | f"std: {np.std(y)} / {np.std(y2)}", sep='\n') |
||
221 | |||
222 | if ("ax1" not in axes) or ("ax2" not in axes) or ("ax3" not in axes): |
||
223 | fig, (axes["ax1"], axes["ax2"], axes["ax3"]) = plt.subplots( |
||
0 ignored issues
–
show
|
|||
224 | nrows=1, ncols=3, figsize=(20, 4) |
||
225 | ) |
||
226 | |||
227 | # plot the two signals |
||
228 | axes["ax1"].plot(t, y, "b-", label="orig", lw=0.5, alpha=0.8) |
||
229 | |||
230 | # ax1p = ax1.twinx() |
||
231 | axes["ax1"].plot(t, y2, "r-", label="gen", lw=0.5, alpha=0.8) |
||
232 | axes["ax1"].set_title("light curves") |
||
233 | |||
234 | # plot their histogram |
||
235 | bins = "auto" # bins = int(y.size**0.5/1.5) # |
||
236 | rang = (np.percentile(y, 0), np.percentile(y, 99)) |
||
237 | axes["ax2"].hist(y, density=True, color="b", alpha=0.4, bins=bins, range=rang) |
||
238 | |||
239 | # ax2p = ax2.twinx() |
||
240 | bins = "auto" # bins = int(y.size**0.5/1.5) # |
||
241 | rang = (np.percentile(y2, 0), np.percentile(y2, 99)) |
||
242 | axes["ax2"].hist(y2, density=True, color="r", alpha=0.4, bins=bins, range=rang) |
||
243 | |||
244 | axes["ax2"].set_title("pdf") |
||
245 | |||
246 | # plot their PSD |
||
247 | if fpsd == "lombscargle": |
||
248 | k = np.linspace(1e-3, self.values.size / 2, self.values.size // 2) |
||
249 | freqs = k / 2 / np.pi |
||
250 | |||
251 | pxx = scipy_signal.lombscargle(t, y, freqs, normalize=True) |
||
252 | axes["ax3"].plot(freqs, pxx, "b-", lw=1, alpha=0.5) |
||
253 | |||
254 | pxx2 = scipy_signal.lombscargle(t, y2, freqs, normalize=True) |
||
255 | axes["ax3"].plot(freqs, pxx2, "r-", lw=1, alpha=0.5) |
||
256 | |||
257 | axes["ax3"].set_xscale("log") |
||
258 | # ax3.set_yscale('log') |
||
259 | else: |
||
260 | axes["ax3"].psd(y, color="b", lw=1, alpha=0.5) |
||
261 | |||
262 | ax3p = axes["ax3"].twinx() |
||
263 | ax3p.psd(y2, color="r", lw=1, alpha=0.5) |
||
264 | |||
265 | axes["ax3"].set_title("PSD") |
||
266 | |||
267 | return axes |
||
268 | |||
269 | |||
270 | |||
271 | @staticmethod |
||
272 | def pdf(xx, ll, mu): |
||
273 | """Helper func to fit pdf as a curve.""" |
||
274 | return ( |
||
275 | (ll * mu) ** (1 + ll) |
||
276 | / scipy_special.gamma(1 + ll) |
||
277 | * np.exp(-ll * mu / xx) |
||
278 | / xx ** (ll + 2) |
||
279 | ) |
||
280 |