1
|
|
|
import os |
|
|
|
|
2
|
|
|
import collections |
3
|
|
|
import numpy as np |
4
|
|
|
|
5
|
|
|
from astromodels import PointSource |
6
|
|
|
from ..psf_fast import PSFInterpolator |
7
|
|
|
from ..interpolation.log_log_interpolation import LogLogInterpolator |
8
|
|
|
|
9
|
|
|
from threeML.exceptions.custom_exceptions import custom_warnings |
|
|
|
|
10
|
|
|
|
11
|
|
|
|
12
|
|
|
class ConvolvedPointSource(object): |
|
|
|
|
13
|
|
|
|
14
|
|
|
def __init__(self, source, response, flat_sky_projection): |
15
|
|
|
|
16
|
|
|
assert isinstance(source, PointSource) |
17
|
|
|
|
18
|
|
|
self._source = source |
19
|
|
|
|
20
|
|
|
# Get name |
21
|
|
|
self._name = self._source.name |
22
|
|
|
|
23
|
|
|
self._response = response |
24
|
|
|
|
25
|
|
|
self._flat_sky_projection = flat_sky_projection |
26
|
|
|
|
27
|
|
|
# This will store the position of the source |
28
|
|
|
# right now, let's use a fake value so that at the first iteration the source maps will be filled |
29
|
|
|
# (see get_expected_signal_per_transit) |
30
|
|
|
self._last_processed_position = (-999, -999) |
31
|
|
|
|
32
|
|
|
self._response_energy_bins = None |
33
|
|
|
self._psf_interpolators = None |
34
|
|
|
|
35
|
|
|
@property |
36
|
|
|
def name(self): |
|
|
|
|
37
|
|
|
|
38
|
|
|
return self._name |
39
|
|
|
|
40
|
|
|
def _update_dec_bins(self, dec_src): |
41
|
|
|
|
42
|
|
|
if abs(dec_src - self._last_processed_position[1]) > 0.1: |
43
|
|
|
|
44
|
|
|
# Source moved by more than 0.1 deg, let's recompute the response and the PSF |
45
|
|
|
self._response_energy_bins = self._response.get_response_dec_bin(dec_src, interpolate=True) |
46
|
|
|
|
47
|
|
|
# Setup the PSF interpolators |
48
|
|
|
self._psf_interpolators = collections.OrderedDict() |
49
|
|
|
for bin_id in self._response_energy_bins: |
50
|
|
|
self._psf_interpolators[bin_id] = PSFInterpolator(self._response_energy_bins[bin_id].psf, |
51
|
|
|
self._flat_sky_projection) |
52
|
|
|
|
53
|
|
|
def get_source_map(self, response_bin_id, tag=None, integrate=False, psf_integration_method='fast'): |
|
|
|
|
54
|
|
|
|
55
|
|
|
# Get current point source position |
56
|
|
|
# NOTE: this might change if the point source position is free during the fit, |
57
|
|
|
# that's why it is here |
58
|
|
|
|
59
|
|
|
ra_src, dec_src = self._source.position.ra.value, self._source.position.dec.value |
60
|
|
|
|
61
|
|
|
if (ra_src, dec_src) != self._last_processed_position: |
62
|
|
|
|
63
|
|
|
# Position changed (or first iteration), let's update the dec bins |
64
|
|
|
self._update_dec_bins(dec_src) |
65
|
|
|
|
66
|
|
|
self._last_processed_position = (ra_src, dec_src) |
67
|
|
|
|
68
|
|
|
# Get the current response bin |
69
|
|
|
response_energy_bin = self._response_energy_bins[response_bin_id] |
70
|
|
|
psf_interpolator = self._psf_interpolators[response_bin_id] |
71
|
|
|
|
72
|
|
|
# Get the PSF image |
73
|
|
|
# This is cached inside the PSF class, so that if the position doesn't change this line |
74
|
|
|
# is very fast |
75
|
|
|
this_map = psf_interpolator.point_source_image(ra_src, dec_src, psf_integration_method) |
76
|
|
|
|
77
|
|
|
# Check that the point source image is entirely contained in the ROI, if not print a warning |
78
|
|
|
map_sum = this_map.sum() |
79
|
|
|
|
80
|
|
|
if not np.isclose(map_sum, 1.0, rtol=1e-2): |
81
|
|
|
|
82
|
|
|
custom_warnings.warn("PSF for source %s is not entirely contained " |
83
|
|
|
"in ROI for response bin %s. Fraction is %.2f instead of 1.0. " |
84
|
|
|
"Consider enlarging your model ROI." % (self._name, |
85
|
|
|
response_bin_id, |
86
|
|
|
map_sum)) |
87
|
|
|
|
88
|
|
|
# Compute the fluxes from the spectral function at the same energies as the simulated function |
89
|
|
|
energy_centers_keV = response_energy_bin.sim_energy_bin_centers * 1e9 # astromodels expects energies in keV |
|
|
|
|
90
|
|
|
|
91
|
|
|
# This call needs to be here because the parameters of the model might change, |
92
|
|
|
# for example during a fit |
93
|
|
|
|
94
|
|
|
source_diff_spectrum = self._source(energy_centers_keV, tag=tag) |
95
|
|
|
|
96
|
|
|
# This provide a way to force the use of integration, for testing |
97
|
|
|
if os.environ.get("HAL_INTEGRATE_POINT_SOURCE", '').lower() == 'yes': # pragma: no cover |
98
|
|
|
|
99
|
|
|
integrate = True |
100
|
|
|
|
101
|
|
|
if integrate: # pragma: no cover |
102
|
|
|
|
103
|
|
|
# Slower approach, where we integrate the spectra of both the simulation and the source |
104
|
|
|
# It is off by default because it is slower and it doesn't provide any improvement in accuracy |
105
|
|
|
# according to my simulations (GV) |
106
|
|
|
|
107
|
|
|
interp_spectrum = LogLogInterpolator(response_energy_bin.sim_energy_bin_centers, |
108
|
|
|
source_diff_spectrum * 1e9, |
109
|
|
|
k=2) |
110
|
|
|
|
111
|
|
|
interp_sim_spectrum = LogLogInterpolator(response_energy_bin.sim_energy_bin_centers, |
112
|
|
|
response_energy_bin.sim_differential_photon_fluxes, |
113
|
|
|
k=2) |
114
|
|
|
|
115
|
|
|
src_spectrum = [interp_spectrum.integral(a, b) for (a, b) in zip(response_energy_bin.sim_energy_bin_low, |
116
|
|
|
response_energy_bin.sim_energy_bin_hi)] |
117
|
|
|
|
118
|
|
|
sim_spectrum = [interp_sim_spectrum.integral(a, b) for (a, b) in zip(response_energy_bin.sim_energy_bin_low, |
119
|
|
|
response_energy_bin.sim_energy_bin_hi)] |
120
|
|
|
|
121
|
|
|
scale = np.array(src_spectrum) / np.array(sim_spectrum) |
122
|
|
|
|
123
|
|
|
else: |
124
|
|
|
|
125
|
|
|
# Transform from keV^-1 cm^-2 s^-1 to TeV^-1 cm^-2 s^-1 and re-weight the detected counts |
126
|
|
|
scale = source_diff_spectrum / response_energy_bin.sim_differential_photon_fluxes * 1e9 |
127
|
|
|
|
128
|
|
|
# Now return the map multiplied by the scale factor |
129
|
|
|
return np.sum(scale * response_energy_bin.sim_signal_events_per_bin) * this_map |
130
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.