1
|
|
|
# Licensed under a 3-clause BSD style license - see LICENSE |
2
|
|
|
"""Synthetic generation of light curves.""" |
3
|
|
|
|
4
|
|
|
import logging |
5
|
|
|
|
6
|
|
|
import matplotlib.pyplot as plt |
|
|
|
|
7
|
|
|
import numpy as np |
|
|
|
|
8
|
|
|
import scipy.optimize as scipy_optimize |
|
|
|
|
9
|
|
|
import scipy.signal as scipy_signal |
|
|
|
|
10
|
|
|
import scipy.special as scipy_special |
|
|
|
|
11
|
|
|
import scipy.stats as scipy_stats |
|
|
|
|
12
|
|
|
from matplotlib.offsetbox import AnchoredText |
|
|
|
|
13
|
|
|
|
14
|
|
|
from mutis.lib.signal import * |
|
|
|
|
15
|
|
|
|
16
|
|
|
__all__ = ["Signal"] |
17
|
|
|
|
18
|
|
|
log = logging.getLogger(__name__) |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
class Signal: |
22
|
|
|
"""Analysis and generation of a signal. |
23
|
|
|
|
24
|
|
|
Description goes here. |
25
|
|
|
|
26
|
|
|
Parameters |
27
|
|
|
---------- |
28
|
|
|
times : :class:`numpy.ndarray` or :class:`pandas.Series` |
29
|
|
|
Values of the time axis. |
30
|
|
|
values : :class:`numpy.ndarray` or :class:`pandas.Series` |
31
|
|
|
Values of the signal axis. |
32
|
|
|
fgen : :class:`str` |
33
|
|
|
Method used to generate the synthetic signal. |
34
|
|
|
""" |
35
|
|
|
|
36
|
|
|
def __init__(self, times, values, fgen): |
37
|
|
|
self.times = np.array(times) |
38
|
|
|
self.values = np.array(values) |
39
|
|
|
self.fgen = fgen |
40
|
|
|
self.synth = None |
41
|
|
|
|
42
|
|
|
# TODO make attributes below specific of OU method / not the entire class |
|
|
|
|
43
|
|
|
self.OU_theta = None |
|
|
|
|
44
|
|
|
self.OU_mu = None |
|
|
|
|
45
|
|
|
self.OU_sigma = None |
|
|
|
|
46
|
|
|
|
47
|
|
|
def plot(self, ax=None): |
|
|
|
|
48
|
|
|
if ax is None: |
49
|
|
|
ax = plt.gca() |
50
|
|
|
|
51
|
|
|
return ax.plot(self.times, self.values, ".-", lw=1, alpha=0.7) |
52
|
|
|
|
53
|
|
|
def gen_synth(self, samples): |
54
|
|
|
"""Description goes here.""" |
55
|
|
|
|
56
|
|
|
self.synth = np.empty((samples, self.times.size)) |
57
|
|
|
for n in range(samples): |
|
|
|
|
58
|
|
|
if self.fgen == "lc_gen_samp": |
59
|
|
|
self.synth[n] = lc_gen_samp(self.values) |
60
|
|
|
elif self.fgen == "lc_gen_psd_fft": |
61
|
|
|
self.synth[n] = lc_gen_psd_fft(self.values) |
62
|
|
|
elif self.fgen == "lc_gen_psd_nft": |
63
|
|
|
self.synth[n] = lc_gen_psd_nft(self.times, self.values) |
64
|
|
|
elif self.fgen == "lc_gen_psd_lombscargle": |
65
|
|
|
self.synth[n] = lc_gen_psd_lombscargle(self.times, self.values) |
66
|
|
|
elif self.fgen == "lc_gen_psd_c": |
67
|
|
|
self.synth[n] = lc_gen_psd_c(self.times, self.values, self.times) |
68
|
|
|
elif self.fgen == "lc_gen_ou": |
69
|
|
|
if self.OU_theta is None or self.OU_mu is None or self.OU_sigma is None: |
70
|
|
|
raise Exception("You need to set the parameters for the signal") |
71
|
|
|
self.synth[n] = lc_gen_ou(self.OU_theta, self.OU_mu, self.OU_sigma, self.times) |
72
|
|
|
else: |
73
|
|
|
raise Exception(f"Unknown fgen method {self.fgen}") |
74
|
|
|
|
75
|
|
|
def OU_fit(self, bins=None, rang=None, a=1e-5, b=100): |
|
|
|
|
76
|
|
|
"""Fit the signal to an OU stochastic process, using several statistical approaches. |
77
|
|
|
|
78
|
|
|
This function tries to fit the signal to an OU stochastic |
79
|
|
|
process using both basic curve fitting and the Maximum |
80
|
|
|
Likelihood Estimation method, and returns some plots of |
81
|
|
|
the signals and its properties, and the estimated parameters. |
82
|
|
|
""" |
83
|
|
|
|
84
|
|
|
# TODO: make a generic fit method |
|
|
|
|
85
|
|
|
res = dict() |
86
|
|
|
|
87
|
|
|
y = self.values |
|
|
|
|
88
|
|
|
t = self.times |
|
|
|
|
89
|
|
|
|
90
|
|
|
dy = np.diff(y) |
|
|
|
|
91
|
|
|
dt = np.diff(t) |
|
|
|
|
92
|
|
|
|
93
|
|
|
# estimate sigma |
94
|
|
|
try: |
95
|
|
|
sigma_est = (np.nanmean(dy ** 2 / y[:-1] ** 2 / dt)) ** 0.5 |
96
|
|
|
res["sigma_est"] = sigma_est |
97
|
|
|
except Exception as e: |
|
|
|
|
98
|
|
|
log.error(f"Could not estimate sigma: {e}") |
|
|
|
|
99
|
|
|
|
100
|
|
|
# plot histogram |
101
|
|
|
if bins is None: |
102
|
|
|
bins = np.int(y.size ** 0.5 / 1.5) # bins='auto' |
103
|
|
|
if rang is None: |
104
|
|
|
rang = (np.percentile(y, 0), np.percentile(y, 99)) |
105
|
|
|
|
106
|
|
|
p, x = np.histogram(y, density=True, bins=bins, range=rang) # bins='sqrt') |
|
|
|
|
107
|
|
|
x = (x + np.roll(x, -1))[:-1] / 2.0 |
|
|
|
|
108
|
|
|
|
109
|
|
|
plt.subplots() |
110
|
|
|
|
111
|
|
|
plt.hist(y, density=True, alpha=0.75, bins=bins, range=rang) |
112
|
|
|
plt.plot(x, p, "r-", alpha=0.5) |
113
|
|
|
|
114
|
|
|
anchored_text = AnchoredText( |
115
|
|
|
f"mean {np.mean(y):.2g} \n " |
116
|
|
|
f"median {np.median(y):.2g} \n " |
117
|
|
|
f"mode {scipy_stats.mode(y)[0][0]:.2g} \n " |
118
|
|
|
f"std {np.std(y):.2g} \n " |
119
|
|
|
f"var {np.var(y):.2g}", |
120
|
|
|
loc="upper right", |
121
|
|
|
) |
122
|
|
|
plt.gca().add_artist(anchored_text) |
123
|
|
|
|
124
|
|
|
# curve_fit |
125
|
|
|
try: |
126
|
|
|
popt, pcov = scipy_optimize.curve_fit(f=self.pdf, xdata=x, ydata=p) |
127
|
|
|
# print('curve_fit: (l, mu)') |
128
|
|
|
# print('popt: ') |
129
|
|
|
# print(popt) |
130
|
|
|
# print('pcov: ') |
131
|
|
|
# print(np.sqrt(np.diag(pcov))) |
132
|
|
|
x_c = np.linspace(1e-5, 1.1 * np.max(x), 1000) |
133
|
|
|
plt.plot(x_c, self.pdf(x_c, *popt), "k-", label="curve_fit", alpha=0.8) |
134
|
|
|
res["curve_fit"] = (popt, np.sqrt(np.diag(pcov))) |
135
|
|
|
except Exception as e: |
|
|
|
|
136
|
|
|
log.error(f"Some error fitting with curve_fit {e}") |
|
|
|
|
137
|
|
|
|
138
|
|
|
# fit pdf with MLE |
139
|
|
|
# TODO: place outside as a helper class |
|
|
|
|
140
|
|
|
# mu here is different than self.mu |
141
|
|
|
class OU(scipy_stats.rv_continuous): |
|
|
|
|
142
|
|
|
def _pdf(self, x, l, mu): |
|
|
|
|
143
|
|
|
return (l * mu) ** (1 + l) / scipy_special.gamma(1 + l) * np.exp(-l * mu / x) / x ** (l + 2) |
|
|
|
|
144
|
|
|
|
145
|
|
|
try: |
146
|
|
|
fit = OU(a=a, b=np.percentile(y, b)).fit(y, 1, 1, floc=0, fscale=1) |
147
|
|
|
# print('MLE fit: (l, mu)') |
148
|
|
|
# print(fit) |
149
|
|
|
x_c = np.linspace(0, 1.1 * np.max(x), 1000) |
150
|
|
|
plt.plot(x_c, self.pdf(x_c, fit[0], fit[1]), "k-.", label="MLE", alpha=0.8) |
151
|
|
|
res["MLE_fit"] = fit[:-2] |
152
|
|
|
except Exception as e: |
|
|
|
|
153
|
|
|
log.error(f"Some error fitting with MLE {e}") |
|
|
|
|
154
|
|
|
|
155
|
|
|
plt.legend(loc="lower right") |
156
|
|
|
|
157
|
|
|
plt.show() |
158
|
|
|
|
159
|
|
|
# estimate theta (from curve_fit) |
160
|
|
|
try: |
161
|
|
|
res["th_est1"] = fit[0] * sigma_est ** 2 / 2 |
162
|
|
|
except NameError as e: |
|
|
|
|
163
|
|
|
res["th_est1"] = None |
164
|
|
|
|
165
|
|
|
# estimate theta (from MLE) |
166
|
|
|
try: |
167
|
|
|
res["th_est2"] = popt[0] * sigma_est ** 2 / 2 |
168
|
|
|
except NameError as e: |
|
|
|
|
169
|
|
|
res["th_est2"] = None |
170
|
|
|
|
171
|
|
|
return res |
172
|
|
|
|
173
|
|
|
def OU_check_gen(self, theta, mu, sigma, fpsd="lombscargle", **axes): |
|
|
|
|
174
|
|
|
"""Check the generation of a synthetic signal with given OU parameters. |
175
|
|
|
|
176
|
|
|
This function checks the generation of a synthetic light curve through |
177
|
|
|
an Orstein-Uhlenbeck process with given `theta`, `mu` and `sigma`, to |
178
|
|
|
ease the discovery of the most suitable parameters to be used in the |
179
|
|
|
generation of the synthetic light curves. |
180
|
|
|
|
181
|
|
|
It returns three plots, on which: |
182
|
|
|
- The first plot show both the original signal and the synthetic one. |
183
|
|
|
- The second plot shows the histogram of the values taken by both signals. |
184
|
|
|
- The third plot shows their PSD. |
185
|
|
|
""" |
186
|
|
|
# TODO make a generic check_gen method |
|
|
|
|
187
|
|
|
|
188
|
|
|
t, y = self.times, self.values |
|
|
|
|
189
|
|
|
y2 = lc_gen_ou(theta, mu, sigma, self.times) # , scale=np.std(self.s), loc=np.mean(self.s)) |
|
|
|
|
190
|
|
|
|
191
|
|
|
if ("ax1" not in axes) or ("ax2" not in axes) or ("ax3" not in axes): |
192
|
|
|
fig, (axes["ax1"], axes["ax2"], axes["ax3"]) = plt.subplots(nrows=1, ncols=3, figsize=(20, 4)) |
|
|
|
|
193
|
|
|
|
194
|
|
|
# Plot the two signals |
195
|
|
|
|
196
|
|
|
axes["ax1"].plot(t, y, "b-", label="orig", lw=0.5, alpha=0.8) |
197
|
|
|
|
198
|
|
|
# ax1p = ax1.twinx() |
199
|
|
|
axes["ax1"].plot(t, y2, "r-", label="gen", lw=0.5, alpha=0.8) |
200
|
|
|
axes["ax1"].set_title("light curves") |
201
|
|
|
|
202
|
|
|
# Plot their histogram |
203
|
|
|
|
204
|
|
|
bins = "auto" # bins = np.int(y.size**0.5/1.5) # |
205
|
|
|
rang = (np.percentile(y, 0), np.percentile(y, 99)) |
206
|
|
|
axes["ax2"].hist(y, density=True, color="b", alpha=0.4, bins=bins, range=rang) |
207
|
|
|
|
208
|
|
|
# ax2p = ax2.twinx() |
209
|
|
|
bins = "auto" # bins = np.int(y.size**0.5/1.5) # |
210
|
|
|
rang = (np.percentile(y2, 0), np.percentile(y2, 99)) |
211
|
|
|
axes["ax2"].hist(y2, density=True, color="r", alpha=0.4, bins=bins, range=rang) |
212
|
|
|
|
213
|
|
|
axes["ax2"].set_title("pdf") |
214
|
|
|
|
215
|
|
|
# Plot their PSD |
216
|
|
|
|
217
|
|
|
if fpsd == "lombscargle": |
218
|
|
|
k = np.linspace(1e-3, self.values.size / 2, self.values.size // 2) |
219
|
|
|
freqs = k / 2 / np.pi |
220
|
|
|
|
221
|
|
|
pxx = scipy_signal.lombscargle(t, y, freqs, normalize=True) |
222
|
|
|
axes["ax3"].plot(freqs, pxx, "b-", lw=1, alpha=0.5) |
223
|
|
|
|
224
|
|
|
pxx2 = scipy_signal.lombscargle(t, y2, freqs, normalize=True) |
225
|
|
|
axes["ax3"].plot(freqs, pxx2, "r-", lw=1, alpha=0.5) |
226
|
|
|
|
227
|
|
|
axes["ax3"].set_xscale("log") |
228
|
|
|
# ax3.set_yscale('log') |
229
|
|
|
else: |
230
|
|
|
axes["ax3"].psd(y, color="b", lw=1, alpha=0.5) |
231
|
|
|
|
232
|
|
|
ax3p = axes["ax3"].twinx() |
233
|
|
|
ax3p.psd(y2, color="r", lw=1, alpha=0.5) |
234
|
|
|
|
235
|
|
|
axes["ax3"].set_title("PSD") |
236
|
|
|
|
237
|
|
|
return axes |
238
|
|
|
|
239
|
|
|
def PSD_check_gen(self, fgen=None, ax=None): |
|
|
|
|
240
|
|
|
"""Check the generation of a synthetic signal with a given `fgen` method.""" |
241
|
|
|
# TODO: make a generic check_gen method |
|
|
|
|
242
|
|
|
|
243
|
|
|
if fgen is None: |
244
|
|
|
fgen = self.fgen |
245
|
|
|
if ax is None: |
246
|
|
|
ax = plt.gca() |
247
|
|
|
|
248
|
|
|
if fgen == "lc_gen_psd_fft": |
249
|
|
|
s2 = lc_gen_psd_fft(self.values) |
|
|
|
|
250
|
|
|
elif fgen == "lc_gen_psd_nft": |
251
|
|
|
s2 = lc_gen_psd_nft(self.times, self.values) |
|
|
|
|
252
|
|
|
elif fgen == "lc_gen_psd_lombscargle": |
253
|
|
|
s2 = lc_gen_psd_lombscargle(self.times, self.values) |
|
|
|
|
254
|
|
|
elif fgen == "lc_gen_psd_c": |
255
|
|
|
s2 = lc_gen_psd_c(self.times, self.values, self.times) |
|
|
|
|
256
|
|
|
else: |
257
|
|
|
raise Exception("No valid fgen specified") |
258
|
|
|
|
259
|
|
|
ax.plot(self.times, self.values, "b-", label="orig", lw=0.5, alpha=0.8) |
260
|
|
|
ax.plot(self.times, s2, "r-", label="gen", lw=0.5, alpha=0.8) |
261
|
|
|
ax.legend() |
262
|
|
|
|
263
|
|
|
@staticmethod |
264
|
|
|
def pdf(xx, ll, mu): |
|
|
|
|
265
|
|
|
"""Helper func to fit pdf as a curve.""" |
266
|
|
|
return (ll * mu) ** (1 + ll) / scipy_special.gamma(1 + ll) * np.exp(-ll * mu / xx) / xx ** (ll + 2) |
|
|
|
|
267
|
|
|
|