1 | # Licensed under a 3-clause BSD style license - see LICENSE |
||
2 | """Synthetic generation of light curves.""" |
||
3 | |||
4 | import logging |
||
5 | |||
6 | import matplotlib.pyplot as plt |
||
7 | import numpy as np |
||
8 | import scipy.optimize as scipy_optimize |
||
9 | import scipy.signal as scipy_signal |
||
10 | import scipy.special as scipy_special |
||
11 | import scipy.stats as scipy_stats |
||
12 | from matplotlib.offsetbox import AnchoredText |
||
13 | |||
14 | from mutis.lib.signal import * |
||
15 | |||
16 | __all__ = ["Signal"] |
||
17 | |||
18 | log = logging.getLogger(__name__) |
||
19 | |||
20 | |||
21 | class Signal: |
||
22 | """Analysis and generation of a signal. |
||
23 | |||
24 | Class for a generic signal. |
||
25 | |||
26 | Attributes |
||
27 | ---------- |
||
28 | times : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
29 | Values of the time axis. |
||
30 | values : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
31 | Values of the signal axis. |
||
32 | dvalues : :class:`numpy.ndarray` or :class:`pandas.Series` |
||
33 | Uncertainties on the values of the signal axis. |
||
34 | |||
35 | Other Attributes |
||
36 | ---------------- |
||
37 | |||
38 | fgen : :class:`str` |
||
39 | Method to generate the synthetic signal. |
||
40 | |||
41 | synth : :class:`numpy.ndarray` |
||
42 | Synthetic signals generated with `fgen`. |
||
43 | |||
44 | OU_theta : :class:`float` |
||
45 | (For OU method) Parameter theta of the OU process. |
||
46 | OU_mu : :class:`float` |
||
47 | (For OU method) Parameter mu of the OU process. |
||
48 | OU_sigma : :class:`float` |
||
49 | (For OU method) Parameter sigma of the OU process. |
||
50 | """ |
||
51 | |||
52 | def __init__(self, times, values, dvalues=None, fgen=None): |
||
53 | |||
54 | self.times = np.array(times, dtype=np.float64) |
||
55 | self.values = np.array(values, dtype=np.float64) |
||
56 | self.fgen = fgen |
||
57 | |||
58 | self.dvalues = np.array(dvalues, dtype=np.float64) if dvalues is not None else None |
||
59 | self.synth = None |
||
60 | |||
61 | # TODO make attributes below specific of OU method / not the entire class |
||
62 | self.OU_theta = None |
||
63 | self.OU_mu = None |
||
64 | self.OU_sigma = None |
||
65 | |||
66 | def plot(self, ax=None): |
||
67 | if ax is None: |
||
68 | ax = plt.gca() |
||
69 | return ax.plot(self.times, self.values, ".-", lw=1, alpha=0.7) |
||
70 | |||
71 | def gen_synth(self, samples): |
||
72 | """Generate synthethic light curves for this signal. |
||
73 | |||
74 | Generated curves are saved in `self.synth`. The times used are the same |
||
75 | of this signal. |
||
76 | |||
77 | Parameters |
||
78 | ---------- |
||
79 | samples: :int: number of samples to generate. |
||
80 | |||
81 | """ |
||
82 | |||
83 | self.synth = np.empty((samples, self.times.size)) |
||
84 | for n in range(samples): |
||
85 | self.synth[n] = fgen_wrapper(fgen=self.fgen, t=self.times, y=self.values, |
||
86 | fgen_params={'theta':self.OU_theta, 'mu':self.OU_mu, 'sigma':self.OU_sigma}) |
||
87 | |||
88 | |||
89 | def OU_fit(self, bins=None, rang=None, a=1e-5, b=100): |
||
90 | """Fit the signal to an OU stochastic process, using several statistical approaches. |
||
91 | |||
92 | This function tries to fit the signal to an OU stochastic |
||
93 | process using both basic curve fitting and the Maximum |
||
94 | Likelihood Estimation method, and returns some plots of |
||
95 | the signals and its properties, and the estimated parameters. |
||
96 | """ |
||
97 | |||
98 | # TODO: make a generic fit method |
||
99 | res = dict() |
||
100 | |||
101 | y = self.values |
||
102 | t = self.times |
||
103 | |||
104 | dy = np.diff(y) |
||
105 | dt = np.diff(t) |
||
106 | |||
107 | # estimate sigma |
||
108 | try: |
||
109 | sigma_est = (np.nanmean(dy ** 2 / y[:-1] ** 2 / dt)) ** 0.5 |
||
110 | res["sigma_est"] = sigma_est |
||
111 | except Exception as e: |
||
0 ignored issues
–
show
|
|||
112 | log.error(f"Could not estimate sigma: {e}") |
||
113 | |||
114 | # plot histogram |
||
115 | if bins is None: |
||
116 | bins = int(y.size ** 0.5 / 1.5) # bins='auto' |
||
117 | if rang is None: |
||
118 | rang = (np.percentile(y, 0), np.percentile(y, 99)) |
||
119 | |||
120 | p, x = np.histogram(y, density=True, bins=bins, range=rang) # bins='sqrt') |
||
121 | x = (x + np.roll(x, -1))[:-1] / 2.0 |
||
122 | |||
123 | plt.subplots() |
||
124 | |||
125 | plt.hist(y, density=True, alpha=0.75, bins=bins, range=rang) |
||
126 | plt.plot(x, p, "r-", alpha=0.5) |
||
127 | |||
128 | anchored_text = AnchoredText( |
||
129 | f"mean {np.mean(y):.2g} \n " |
||
130 | f"median {np.median(y):.2g} \n " |
||
131 | f"mode {scipy_stats.mode(y)[0]:.2g} \n " |
||
132 | f"std {np.std(y):.2g} \n " |
||
133 | f"var {np.var(y):.2g}", |
||
134 | loc="upper right", |
||
135 | ) |
||
136 | plt.gca().add_artist(anchored_text) |
||
137 | |||
138 | # curve_fit |
||
139 | try: |
||
140 | popt, pcov = scipy_optimize.curve_fit(f=self.pdf, xdata=x, ydata=p) |
||
141 | # print('curve_fit: (l, mu)') |
||
142 | # print('popt: ') |
||
143 | # print(popt) |
||
144 | # print('pcov: ') |
||
145 | # print(np.sqrt(np.diag(pcov))) |
||
146 | x_c = np.linspace(1e-5, 1.1 * np.max(x), 1000) |
||
147 | plt.plot(x_c, self.pdf(x_c, *popt), "k-", label="curve_fit", alpha=0.8) |
||
148 | res["curve_fit"] = (popt, np.sqrt(np.diag(pcov))) |
||
149 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
150 | log.error(f"Some error fitting with curve_fit {e}") |
||
151 | |||
152 | # fit pdf with MLE |
||
153 | # TODO: place outside as a helper class |
||
154 | # mu here is different than self.mu |
||
155 | class OU(scipy_stats.rv_continuous): |
||
156 | def _pdf(self, x, l, mu): |
||
157 | return ( |
||
158 | (l * mu) ** (1 + l) |
||
159 | / scipy_special.gamma(1 + l) |
||
160 | * np.exp(-l * mu / x) |
||
161 | / x ** (l + 2) |
||
162 | ) |
||
163 | |||
164 | try: |
||
165 | fit = OU(a=a, b=np.percentile(y, b)).fit(y, 1, 1, floc=0, fscale=1) |
||
166 | # print('MLE fit: (l, mu)') |
||
167 | # print(fit) |
||
168 | x_c = np.linspace(0, 1.1 * np.max(x), 1000) |
||
169 | plt.plot(x_c, self.pdf(x_c, fit[0], fit[1]), "k-.", label="MLE", alpha=0.8) |
||
170 | res["MLE_fit"] = fit[:-2] |
||
171 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
172 | log.error(f"Some error fitting with MLE {e}") |
||
173 | |||
174 | plt.legend(loc="lower right") |
||
175 | # plt.show() |
||
176 | |||
177 | # estimate theta (from curve_fit) |
||
178 | try: |
||
179 | res["th_est1"] = fit[0] * sigma_est ** 2 / 2 |
||
180 | except NameError as e: |
||
181 | res["th_est1"] = None |
||
182 | |||
183 | # estimate theta (from MLE) |
||
184 | try: |
||
185 | res["th_est2"] = popt[0] * sigma_est ** 2 / 2 |
||
186 | except NameError as e: |
||
187 | res["th_est2"] = None |
||
188 | |||
189 | return res |
||
190 | |||
191 | |||
192 | def check_gen(self, fgen=None, fgen_params=None, fpsd="lombscargle", **axes): |
||
193 | """Check the generation of synthetic signals. |
||
194 | |||
195 | This function checks the generation of a synthetic light curve. |
||
196 | |||
197 | Parameters |
||
198 | ---------- |
||
199 | fgen: :str: |
||
200 | A valid fgen method name. |
||
201 | fgen_params: :dict: |
||
202 | Parameters for fgen (e.g. for fgen='lc_gen_ou', a dict containing |
||
203 | values for theta, mu and sigma). |
||
204 | |||
205 | Returns |
||
206 | ------- |
||
207 | axes: :tuple: |
||
208 | Tuple of three axes, on which: |
||
209 | - The first plot show both the original signal and the synthetic one. |
||
210 | - The second plot shows the histogram of the values taken by both signals. |
||
211 | - The third plot shows their PSD. |
||
212 | """ |
||
213 | |||
214 | t, y = self.times, self.values |
||
215 | |||
216 | y2 = fgen_wrapper(fgen=fgen, t=t, y=y, fgen_params=fgen_params) |
||
217 | |||
218 | print(f"Original vs Synthethic:", |
||
219 | f"mean: {np.mean(y)} / {np.mean(y2)}", |
||
220 | f"std: {np.std(y)} / {np.std(y2)}", sep='\n') |
||
221 | |||
222 | if ("ax1" not in axes) or ("ax2" not in axes) or ("ax3" not in axes): |
||
223 | fig, (axes["ax1"], axes["ax2"], axes["ax3"]) = plt.subplots( |
||
224 | nrows=1, ncols=3, figsize=(20, 4) |
||
225 | ) |
||
226 | |||
227 | # plot the two signals |
||
228 | axes["ax1"].plot(t, y, "b-", label="orig", lw=0.5, alpha=0.8) |
||
229 | |||
230 | # ax1p = ax1.twinx() |
||
231 | axes["ax1"].plot(t, y2, "r-", label="gen", lw=0.5, alpha=0.8) |
||
232 | axes["ax1"].set_title("light curves") |
||
233 | |||
234 | # plot their histogram |
||
235 | bins = "auto" # bins = int(y.size**0.5/1.5) # |
||
236 | rang = (np.percentile(y, 0), np.percentile(y, 99)) |
||
237 | axes["ax2"].hist(y, density=True, color="b", alpha=0.4, bins=bins, range=rang) |
||
238 | |||
239 | # ax2p = ax2.twinx() |
||
240 | bins = "auto" # bins = int(y.size**0.5/1.5) # |
||
241 | rang = (np.percentile(y2, 0), np.percentile(y2, 99)) |
||
242 | axes["ax2"].hist(y2, density=True, color="r", alpha=0.4, bins=bins, range=rang) |
||
243 | |||
244 | axes["ax2"].set_title("pdf") |
||
245 | |||
246 | # plot their PSD |
||
247 | if fpsd == "lombscargle": |
||
248 | k = np.linspace(1e-3, self.values.size / 2, self.values.size // 2) |
||
249 | freqs = k / 2 / np.pi |
||
250 | |||
251 | pxx = scipy_signal.lombscargle(t, y, freqs, normalize=True) |
||
252 | axes["ax3"].plot(freqs, pxx, "b-", lw=1, alpha=0.5) |
||
253 | |||
254 | pxx2 = scipy_signal.lombscargle(t, y2, freqs, normalize=True) |
||
255 | axes["ax3"].plot(freqs, pxx2, "r-", lw=1, alpha=0.5) |
||
256 | |||
257 | axes["ax3"].set_xscale("log") |
||
258 | # ax3.set_yscale('log') |
||
259 | else: |
||
260 | axes["ax3"].psd(y, color="b", lw=1, alpha=0.5) |
||
261 | |||
262 | ax3p = axes["ax3"].twinx() |
||
263 | ax3p.psd(y2, color="r", lw=1, alpha=0.5) |
||
264 | |||
265 | axes["ax3"].set_title("PSD") |
||
266 | |||
267 | return axes |
||
268 | |||
269 | |||
270 | |||
271 | @staticmethod |
||
272 | def pdf(xx, ll, mu): |
||
273 | """Helper func to fit pdf as a curve.""" |
||
274 | return ( |
||
275 | (ll * mu) ** (1 + ll) |
||
276 | / scipy_special.gamma(1 + ll) |
||
277 | * np.exp(-ll * mu / xx) |
||
278 | / xx ** (ll + 2) |
||
279 | ) |
||
280 |
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.
So, unless you specifically plan to handle any error, consider adding a more specific exception.