hawc_hal.convolved_source.convolved_extended_source   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 202
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 82
dl 0
loc 202
rs 10
c 0
b 0
f 0
wmc 18

6 Methods

Rating   Name   Duplication   Size   Complexity  
A ConvolvedExtendedSource.get_source_map() 0 3 1
A ConvolvedExtendedSource._setup_callbacks() 0 16 4
A ConvolvedExtendedSource3D.__init__() 0 10 1
A ConvolvedExtendedSource3D.get_source_map() 0 61 4
A ConvolvedExtendedSource3D._parameter_change_callback() 0 9 1
B ConvolvedExtendedSource.__init__() 0 64 5

1 Function

Rating   Name   Duplication   Size   Complexity  
A _select_with_wrap_around() 0 13 2
1
import numpy as np
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2
3
from astromodels import use_astromodels_memoization
4
5
6
def _select_with_wrap_around(arr, start, stop, wrap=(360, 0)):
7
8
    if start > stop:
9
10
        # This can happen if for instance lon_start = 280 and lon_stop = 10, which means we
11
        # should keep between 280 and 360 and then between 0 to 10
12
        idx = ((arr >= stop) & (arr <= wrap[0])) | ((arr >= wrap[1]) & (arr <= start))
13
14
    else:
15
16
        idx = (arr >= start) & (arr <= stop)
17
18
    return idx
19
20
# Conversion factor between deg^2 and rad^2
21
deg2_to_rad2 = 0.00030461741978670857
0 ignored issues
show
Coding Style Naming introduced by
Constant name "deg2_to_rad2" doesn't conform to UPPER_CASE naming style ('(([A-Z_][A-Z0-9_]*)|(__.*__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
22
23
24
class ConvolvedExtendedSource(object):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
best-practice introduced by
Too many instance attributes (9/7)
Loading history...
25
26
    def __init__(self, source, response, flat_sky_projection):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (25/15).
Loading history...
27
28
        self._response = response
29
        self._flat_sky_projection = flat_sky_projection
30
31
        # Get name
32
        self._name = source.name
33
34
        self._source = source
35
36
        # Find out the response bins we need to consider for this extended source
37
38
        # # Get the footprint (i..e, the coordinates of the 4 points limiting the projections)
39
        (ra1, dec1), (ra2, dec2), (ra3, dec3), (ra4, dec4) = flat_sky_projection.wcs.calc_footprint()
0 ignored issues
show
Unused Code introduced by
The variable ra4 seems to be unused.
Loading history...
Unused Code introduced by
The variable ra2 seems to be unused.
Loading history...
Unused Code introduced by
The variable ra3 seems to be unused.
Loading history...
Unused Code introduced by
The variable ra1 seems to be unused.
Loading history...
40
41
        (lon_start, lon_stop), (lat_start, lat_stop) = source.get_boundaries()
42
43
        # Figure out maximum and minimum declination to be covered
44
        dec_min = max(min([dec1, dec2, dec3, dec4]), lat_start)
45
        dec_max = min(max([dec1, dec2, dec3, dec4]), lat_stop)
46
47
        # Get the defined dec bins lower edges
48
        lower_edges = np.array(map(lambda x: x[0], response.dec_bins))
0 ignored issues
show
introduced by
map/filter on lambda could be replaced by comprehension
Loading history...
49
        upper_edges = np.array(map(lambda x: x[-1], response.dec_bins))
0 ignored issues
show
introduced by
map/filter on lambda could be replaced by comprehension
Loading history...
50
        centers = np.array(map(lambda x: x[1], response.dec_bins))
0 ignored issues
show
introduced by
map/filter on lambda could be replaced by comprehension
Loading history...
51
52
        dec_bins_to_consider_idx = np.flatnonzero((upper_edges >= dec_min) & (lower_edges <= dec_max))
53
54
        # Wrap the selection so we have always one bin before and one after.
55
        # NOTE: we assume that the ROI do not overlap with the very first or the very last dec bin
56
        # Add one dec bin to cover the last part
57
        dec_bins_to_consider_idx = np.append(dec_bins_to_consider_idx, [dec_bins_to_consider_idx[-1] + 1])
58
        # Add one dec bin to cover the first part
59
        dec_bins_to_consider_idx = np.insert(dec_bins_to_consider_idx, 0, [dec_bins_to_consider_idx[0] - 1])
60
61
        self._dec_bins_to_consider = map(lambda x: response.response_bins[centers[x]], dec_bins_to_consider_idx)
0 ignored issues
show
introduced by
map/filter on lambda could be replaced by comprehension
Loading history...
62
63
        print("Considering %i dec bins for extended source %s" % (len(self._dec_bins_to_consider),
64
                                                                  self._name))
65
66
        # Find central bin for the PSF
67
68
        dec_center = (lat_start + lat_stop) / 2.0
69
        #
70
        self._central_response_bins = response.get_response_dec_bin(dec_center, interpolate=False)
71
72
        print("Central bin is bin at Declination = %.3f" % dec_center)
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after print.
Loading history...
73
74
        # Take note of the pixels within the flat sky projection that actually need to be computed. If the extended
75
        # source is significantly smaller than the flat sky projection, this gains a substantial amount of time
76
77
        idx_lon = _select_with_wrap_around(self._flat_sky_projection.ras, lon_start, lon_stop, (360, 0))
78
        idx_lat = _select_with_wrap_around(self._flat_sky_projection.decs, lat_start, lat_stop, (90, -90))
79
80
        self._active_flat_sky_mask = (idx_lon & idx_lat)
81
82
        assert np.sum(self._active_flat_sky_mask) > 0, "Mismatch between source %s and ROI" % self._name
83
84
        # Get the energies needed for the computation of the flux
85
        self._energy_centers_keV = self._central_response_bins[self._central_response_bins.keys()[0]].sim_energy_bin_centers * 1e9
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (130/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
Coding Style Naming introduced by
Attribute name "_energy_centers_keV" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
86
87
        # Prepare array for fluxes
88
        self._all_fluxes = np.zeros((self._flat_sky_projection.ras.shape[0],
89
                                     self._energy_centers_keV.shape[0]))
90
91
    def _setup_callbacks(self, callback):
92
93
        # Register call back with all free parameters and all linked parameters. If a parameter is linked to another
94
        # one, the other one might be in a different source, so we add a callback to that one so that we recompute
95
        # this source when needed.
96
        for parameter in self._source.parameters.values():
97
98
            if parameter.free:
99
                parameter.add_callback(callback)
100
101
            if parameter.has_auxiliary_variable():
102
                # Add a callback to the auxiliary variable so that when that is changed, we need
103
                # to recompute the model
104
                aux_variable, _ = parameter.auxiliary_variable
105
106
                aux_variable.add_callback(callback)
107
108
    def get_source_map(self, energy_bin_id, tag=None):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
109
110
        raise NotImplementedError("You need to implement this in derived classes")
111
112
113
class ConvolvedExtendedSource3D(ConvolvedExtendedSource):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
114
115
    def __init__(self, source, response, flat_sky_projection):
116
117
        super(ConvolvedExtendedSource3D, self).__init__(source, response, flat_sky_projection)
118
119
        # We implement a caching system so that the source flux is evaluated only when strictly needed,
120
        # because it is the most computationally intense part otherwise.
121
        self._recompute_flux = True
122
123
        # Register callback to keep track of the parameter changes
124
        self._setup_callbacks(self._parameter_change_callback)
125
126
    def _parameter_change_callback(self, this_parameter):
0 ignored issues
show
Unused Code introduced by
The argument this_parameter seems to be unused.
Loading history...
127
128
        # A parameter has changed, we need to recompute the function.
129
        # NOTE: we do not recompute it here because if more than one parameter changes at the time (like for example
130
        # during sampling) we do not want to recompute the function multiple time before we get to the convolution
131
        # stage. Therefore we will compute the function in the get_source_map method
132
133
        # print("%s has changed" % this_parameter.name)
134
        self._recompute_flux = True
135
136
    def get_source_map(self, energy_bin_id, tag=None):
137
138
        # We do not use the memoization in astromodels because we are doing caching by ourselves,
139
        # so the astromodels memoization would turn into 100% cache miss and use a lot of RAM for nothing,
140
        # given that we are evaluating the function on many points and many energies
141
        with use_astromodels_memoization(False):
142
143
            # If we need to recompute the flux, let's do it
144
            if self._recompute_flux:
145
146
                # print("recomputing %s" % self._name)
147
148
                # Recompute the fluxes for the pixels that are covered by this extended source
149
                self._all_fluxes[self._active_flat_sky_mask, :] = self._source(
150
                                                            self._flat_sky_projection.ras[self._active_flat_sky_mask],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation (remove 40 spaces).
Loading history...
151
                                                            self._flat_sky_projection.decs[self._active_flat_sky_mask],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation (remove 40 spaces).
Loading history...
152
                                                            self._energy_centers_keV)  # 1 / (keV cm^2 s rad^2)
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation (remove 40 spaces).
Loading history...
153
154
                # We don't need to recompute the function anymore until a parameter changes
155
                self._recompute_flux = False
156
157
            # Now compute the expected signal
158
159
            pixel_area_rad2 = self._flat_sky_projection.project_plane_pixel_area * deg2_to_rad2
160
161
            this_model_image = np.zeros(self._all_fluxes.shape[0])
162
163
            # Loop over the Dec bins that cover this source and compute the expected flux, interpolating between
164
            # two dec bins for each point
165
166
            for dec_bin1, dec_bin2 in zip(self._dec_bins_to_consider[:-1], self._dec_bins_to_consider[1:]):
167
168
                # Get the two response bins to consider
169
                this_response_bin1 = dec_bin1[energy_bin_id]
170
                this_response_bin2 = dec_bin2[energy_bin_id]
171
172
                # Figure out which pixels are between the centers of the dec bins we are considering
173
                c1, c2 = this_response_bin1.declination_center, this_response_bin2.declination_center
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c1" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style Naming introduced by
Variable name "c2" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
174
175
                idx = (self._flat_sky_projection.decs >= c1) & (self._flat_sky_projection.decs < c2) & \
176
                      self._active_flat_sky_mask
177
178
                # Reweight the spectrum separately for the two bins
179
                # NOTE: the scale is the same because the sim_differential_photon_fluxes are the same (the simulation
180
                # used to make the response used the same spectrum for each bin). What changes between the two bins
181
                # is the observed signal per bin (the .sim_signal_events_per_bin member)
182
                scale = (self._all_fluxes[idx, :] * pixel_area_rad2) / this_response_bin1.sim_differential_photon_fluxes
183
184
                # Compute the interpolation weights for the two responses
185
                w1 = (self._flat_sky_projection.decs[idx] - c2) / (c1 - c2)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "w1" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
186
                w2 = (self._flat_sky_projection.decs[idx] - c1) / (c2 - c1)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "w2" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
187
188
                this_model_image[idx] = (w1 * np.sum(scale * this_response_bin1.sim_signal_events_per_bin, axis=1) +
189
                                         w2 * np.sum(scale * this_response_bin2.sim_signal_events_per_bin, axis=1)) * \
190
                                        1e9
191
192
            # Reshape the flux array into an image
193
            this_model_image = this_model_image.reshape((self._flat_sky_projection.npix_height,
194
                                                         self._flat_sky_projection.npix_width)).T
195
196
            return this_model_image
197
198
199
class ConvolvedExtendedSource2D(ConvolvedExtendedSource3D):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
200
201
    pass
202
203
    # def __init__(self, source, response, flat_sky_projection):
204
    #
205
    #     assert source.spatial_shape.n_dim == 2, "Use the ConvolvedExtendedSource3D for this source"
206
    #
207
    #     super(ConvolvedExtendedSource2D, self).__init__(source, response, flat_sky_projection)
208
    #
209
    #     # Set up the caching
210
    #     self._spectral_part = {}
211
    #     self._spatial_part = None
212
    #
213
    #     # Now make a list of parameters for the spectral shape.
214
    #     self._spectral_parameters = []
215
    #     self._spatial_parameters = []
216
    #
217
    #     for component in self._source.components.values():
218
    #
219
    #         for parameter in component.shape.parameters.values():
220
    #
221
    #             self._spectral_parameters.append(parameter.path)
222
    #
223
    #             # If there are links, we need to keep track of them
224
    #             if parameter.has_auxiliary_variable():
225
    #
226
    #                 aux_variable, _ = parameter.auxiliary_variable
227
    #
228
    #                 self._spectral_parameters.append(aux_variable.path)
229
    #
230
    #     for parameter in self._source.spatial_shape.parameters.values():
231
    #
232
    #         self._spatial_parameters.append(parameter.path)
233
    #
234
    #         # If there are links, we need to keep track of them
235
    #         if parameter.has_auxiliary_variable():
236
    #
237
    #             aux_variable, _ = parameter.auxiliary_variable
238
    #
239
    #             self._spatial_parameters.append(aux_variable.path)
240
    #
241
    #     self._setup_callbacks(self._parameter_change_callback)
242
    #
243
    # def _parameter_change_callback(self, this_parameter):
244
    #
245
    #     if this_parameter.path in self._spectral_parameters:
246
    #
247
    #         # A spectral parameters. Need to recompute the spectrum
248
    #         self._spectral_part = {}
249
    #
250
    #     else:
251
    #
252
    #         # A spatial parameter, need to recompute the spatial shape
253
    #         self._spatial_part = None
254
    #
255
    # def _compute_response_scale(self):
256
    #
257
    #     # First get the differential flux from the spectral components (most likely there is only one component,
258
    #     # but let's do it so it can generalize to multi-component sources)
259
    #
260
    #     results = [component.shape(self._energy_centers_keV) for component in self._source.components.values()]
261
    #
262
    #     this_diff_flux = np.sum(results, 0)
263
    #
264
    #     # NOTE: the sim_differential_photon_fluxes are the same for every bin, so it doesn't matter which
265
    #     # bin I read them from
266
    #
267
    #     scale = this_diff_flux * 1e9 / self._central_response_bins[0].sim_differential_photon_fluxes
268
    #
269
    #     return scale
270
    #
271
    # def get_source_map(self, energy_bin_id, tag=None):
272
    #
273
    #     # We do not use the memoization in astromodels because we are doing caching by ourselves,
274
    #     # so the astromodels memoization would turn into 100% cache miss
275
    #     with use_astromodels_memoization(False):
276
    #
277
    #         # Spatial part: first we try to see if we have it already in the cache
278
    #
279
    #         if self._spatial_part is None:
280
    #
281
    #             # Cache miss, we need to recompute
282
    #
283
    #             # We substitute integration with a discretization, which will work
284
    #             # well if the size of the pixel of the flat sky grid is small enough compared to the features of the
285
    #             # extended source
286
    #             brightness = self._source.spatial_shape(self._flat_sky_projection.ras, self._flat_sky_projection.decs)  # 1 / rad^2
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (133/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
287
    #
288
    #             pixel_area_rad2 = self._flat_sky_projection.project_plane_pixel_area * deg2_to_rad2
289
    #
290
    #             integrated_flux = brightness * pixel_area_rad2
291
    #
292
    #             # Now convolve with psf
293
    #             spatial_part = integrated_flux.reshape((self._flat_sky_projection.npix_height,
294
    #                                                     self._flat_sky_projection.npix_width)).T
295
    #
296
    #             # Memorize hoping to reuse it next time
297
    #             self._spatial_part = spatial_part
298
    #
299
    #         # Spectral part
300
    #         spectral_part = self._spectral_part.get(energy_bin_id, None)
301
    #
302
    #         if spectral_part is None:
303
    #
304
    #             spectral_part = np.zeros(self._all_fluxes.shape[0])
305
    #
306
    #             scale = self._compute_response_scale()
307
    #
308
    #             # Loop over the Dec bins that cover this source and compute the expected flux, interpolating between
309
    #             # two dec bins for each point
310
    #
311
    #             for dec_bin1, dec_bin2 in zip(self._dec_bins_to_consider[:-1], self._dec_bins_to_consider[1:]):
312
    #                 # Get the two response bins to consider
313
    #                 this_response_bin1 = dec_bin1[energy_bin_id]
314
    #                 this_response_bin2 = dec_bin2[energy_bin_id]
315
    #
316
    #                 # Figure out which pixels are between the centers of the dec bins we are considering
317
    #                 c1, c2 = this_response_bin1.declination_center, this_response_bin2.declination_center
318
    #
319
    #                 idx = (self._flat_sky_projection.decs >= c1) & (self._flat_sky_projection.decs < c2) & \
320
    #                       self._active_flat_sky_mask
321
    #
322
    #                 # Reweight the spectrum separately for the two bins
323
    #
324
    #                 # Compute the interpolation weights for the two responses
325
    #                 w1 = (self._flat_sky_projection.decs[idx] - c2) / (c1 - c2)
326
    #                 w2 = (self._flat_sky_projection.decs[idx] - c1) / (c2 - c1)
327
    #
328
    #                 spectral_part[idx] = (w1 * np.sum(scale * this_response_bin1.sim_signal_events_per_bin) +
329
    #                                       w2 * np.sum(scale * this_response_bin2.sim_signal_events_per_bin))
330
    #
331
    #             # Memorize hoping to reuse it next time
332
    #             self._spectral_part[energy_bin_id] = spectral_part.reshape((self._flat_sky_projection.npix_height,
333
    #                                                                         self._flat_sky_projection.npix_width)).T
334
    #
335
    #     return self._spectral_part[energy_bin_id] * self._spatial_part
336