1 | # Licensed under a 3-clause BSD style license - see LICENSE.rst |
||
2 | from __future__ import absolute_import, division, print_function, unicode_literals |
||
3 | import logging |
||
4 | |||
5 | import numpy as np |
||
6 | from astropy.coordinates import Angle |
||
7 | from scipy.ndimage import distance_transform_edt |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
8 | from regions import PixCoord, CirclePixelRegion |
||
0 ignored issues
–
show
|
|||
9 | |||
10 | from ..maps import WcsNDMap |
||
11 | from .background_estimate import BackgroundEstimate |
||
12 | |||
13 | __all__ = ["ReflectedRegionsFinder", "ReflectedRegionsBackgroundEstimator"] |
||
14 | |||
15 | log = logging.getLogger(__name__) |
||
16 | |||
17 | |||
18 | def _compute_distance_image(mask_map): |
||
19 | """Distance to nearest exclusion region. |
||
20 | |||
21 | Compute distance image, i.e. the Euclidean (=Cartesian 2D) |
||
22 | distance (in pixels) to the nearest exclusion region. |
||
23 | |||
24 | We need to call distance_transform_edt twice because it only computes |
||
25 | dist for pixels outside exclusion regions, so to get the |
||
26 | distances for pixels inside we call it on the inverted mask |
||
27 | and then combine both distance images into one, using negative |
||
28 | distances (note the minus sign) for pixels inside exclusion regions. |
||
29 | |||
30 | If data consist only of ones, it'll be supposed to be far away |
||
31 | from zero pixels, so in capacity of answer it should be return |
||
32 | the matrix with the shape as like as data but packed by constant |
||
33 | value Max_Value (MAX_VALUE = 1e10). |
||
34 | |||
35 | If data consist only of zeros, it'll be supposed to be deep inside |
||
36 | an exclusion region, so in capacity of answer it should be return |
||
37 | the matrix with the shape as like as data but packed by constant |
||
38 | value -Max_Value (MAX_VALUE = 1e10). |
||
39 | |||
40 | Returns |
||
41 | ------- |
||
42 | distance : `~gammapy.maps.WcsNDMap` |
||
43 | Map of distance to nearest exclusion region. |
||
44 | """ |
||
45 | max_value = 1e10 |
||
46 | |||
47 | if np.all(mask_map.data == 1): |
||
48 | dist_map = mask_map.copy(data=mask_map.data * max_value) |
||
49 | return dist_map |
||
50 | |||
51 | if np.all(mask_map.data == 0): |
||
52 | dist_map = mask_map.copy(data=mask_map.data - max_value) |
||
53 | return dist_map |
||
54 | |||
55 | distance_outside = distance_transform_edt(mask_map.data) |
||
56 | |||
57 | invert_mask = np.invert(np.array(mask_map.data, dtype=np.bool)) |
||
58 | distance_inside = distance_transform_edt(invert_mask) |
||
59 | |||
60 | distance = np.where( |
||
61 | mask_map.data, |
||
62 | distance_outside, |
||
63 | -distance_inside, # pylint:disable=invalid-unary-operand-type |
||
64 | ) |
||
65 | |||
66 | return mask_map.copy(data=distance) |
||
67 | |||
68 | |||
69 | class ReflectedRegionsFinder(object): |
||
0 ignored issues
–
show
|
|||
70 | """Find reflected regions. |
||
71 | |||
72 | This class is responsible for placing :ref:`region_reflected` for a given |
||
73 | input region and pointing position. It converts to pixel coordinates |
||
74 | internally. At the moment it works only for circles. If you want to make a |
||
75 | background estimate for an IACT observation using the reflected regions |
||
76 | method, see also `~gammapy.background.ReflectedRegionsBackgroundEstimator` |
||
77 | |||
78 | Parameters |
||
79 | ---------- |
||
80 | region : `~regions.CircleSkyRegion` |
||
81 | Region to rotate |
||
82 | center : `~astropy.coordinates.SkyCoord` |
||
83 | Rotation point |
||
84 | angle_increment : `~astropy.coordinates.Angle`, optional |
||
85 | Rotation angle applied when a region falls in an excluded region. |
||
86 | min_distance : `~astropy.coordinates.Angle`, optional |
||
87 | Minimal distance between to reflected regions |
||
88 | min_distance_input : `~astropy.coordinates.Angle`, optional |
||
89 | Minimal distance from input region |
||
90 | max_region_number : int, optional |
||
91 | Maximum number of regions to use |
||
92 | exclusion_mask : `~gammapy.maps.WcsNDMap`, optional |
||
93 | Exclusion mask |
||
94 | |||
95 | Examples |
||
96 | -------- |
||
97 | >>> from astropy.coordinates import SkyCoord, Angle |
||
98 | >>> from regions import CircleSkyRegion |
||
99 | >>> from gammapy.background import ReflectedRegionsFinder |
||
100 | >>> pointing = SkyCoord(83.2, 22.7, unit='deg', frame='icrs') |
||
101 | >>> target_position = SkyCoord(80.2, 23.5, unit='deg', frame='icrs') |
||
102 | >>> theta = Angle(0.4, 'deg') |
||
103 | >>> on_region = CircleSkyRegion(target_position, theta) |
||
104 | >>> finder = ReflectedRegionsFinder(min_distance_input='1 rad', region=on_region, center=pointing) |
||
0 ignored issues
–
show
|
|||
105 | >>> finder.run() |
||
106 | >>> print(finder.reflected_regions[0]) |
||
107 | Region: CircleSkyRegion |
||
108 | center: <SkyCoord (Galactic): (l, b) in deg |
||
109 | ( 184.9367087, -8.37920222)> |
||
110 | radius: 0.400147197682 deg |
||
111 | """ |
||
112 | |||
113 | def __init__( |
||
0 ignored issues
–
show
|
|||
114 | self, |
||
0 ignored issues
–
show
|
|||
115 | region, |
||
0 ignored issues
–
show
|
|||
116 | center, |
||
0 ignored issues
–
show
|
|||
117 | angle_increment="0.1 rad", |
||
0 ignored issues
–
show
|
|||
118 | min_distance="0 rad", |
||
0 ignored issues
–
show
|
|||
119 | min_distance_input="0.1 rad", |
||
0 ignored issues
–
show
|
|||
120 | max_region_number=10000, |
||
0 ignored issues
–
show
|
|||
121 | exclusion_mask=None, |
||
0 ignored issues
–
show
|
|||
122 | ): |
||
123 | self.region = region |
||
124 | self.center = center |
||
125 | |||
126 | self.angle_increment = Angle(angle_increment) |
||
127 | if self.angle_increment <= Angle(0, "deg"): |
||
128 | raise ValueError("angle_increment is too small") |
||
129 | |||
130 | self.min_distance = Angle(min_distance) |
||
131 | self.min_distance_input = Angle(min_distance_input) |
||
132 | self.exclusion_mask = exclusion_mask |
||
133 | self.max_region_number = max_region_number |
||
134 | self.reflected_regions = None |
||
135 | |||
136 | def run(self): |
||
137 | """Run all steps. |
||
138 | """ |
||
139 | if self.exclusion_mask is None: |
||
140 | self.exclusion_mask = self.make_empty_mask(self.region, self.center) |
||
141 | self.setup() |
||
142 | self.find_regions() |
||
143 | |||
144 | @staticmethod |
||
145 | def make_empty_mask(region, center): |
||
146 | """Create empty exclusion mask. |
||
147 | |||
148 | The size of the mask is chosen such that all reflected region are |
||
149 | contained on the image. |
||
150 | |||
151 | Parameters |
||
152 | ---------- |
||
153 | region : `~regions.CircleSkyRegion` |
||
154 | Region to rotate |
||
155 | center : `~astropy.coordinates.SkyCoord` |
||
156 | Rotation point |
||
157 | """ |
||
158 | log.debug("No exclusion mask provided, creating an emtpy one") |
||
159 | binsz = 0.02 |
||
160 | width = 3 * region.center.separation(center) |
||
161 | |||
162 | maskmap = WcsNDMap.create( |
||
163 | skydir=center, binsz=binsz, width=width, coordsys="GAL", proj="TAN" |
||
164 | ) |
||
165 | maskmap.data += 1.0 |
||
166 | return maskmap |
||
167 | |||
168 | def setup(self): |
||
169 | """Compute parameters for reflected regions algorithm.""" |
||
170 | wcs = self.exclusion_mask.geom.wcs |
||
171 | self._pix_region = self.region.to_pixel(wcs) |
||
0 ignored issues
–
show
|
|||
172 | self._pix_center = PixCoord(*self.center.to_pixel(wcs)) |
||
0 ignored issues
–
show
|
|||
173 | dx = self._pix_region.center.x - self._pix_center.x |
||
174 | dy = self._pix_region.center.y - self._pix_center.y |
||
175 | |||
176 | # Offset of region in pix coordinates |
||
177 | self._offset = np.hypot(dx, dy) |
||
0 ignored issues
–
show
|
|||
178 | |||
179 | # Starting angle of region |
||
180 | self._angle = Angle(np.arctan2(dx, dy), "rad") |
||
0 ignored issues
–
show
|
|||
181 | |||
182 | # Minimum angle a circle has to be moved to not overlap with previous one |
||
183 | min_ang = Angle(2 * np.arcsin(self._pix_region.radius / self._offset), "rad") |
||
184 | |||
185 | # Add required minimal distance between two off regions |
||
186 | self._min_ang = min_ang + self.min_distance |
||
0 ignored issues
–
show
|
|||
187 | |||
188 | # Maximum possible angle before regions is reached again |
||
189 | self._max_angle = ( |
||
0 ignored issues
–
show
|
|||
190 | self._angle + Angle("360deg") - self._min_ang - self.min_distance_input |
||
191 | ) |
||
192 | |||
193 | # Distance image |
||
194 | self._distance_image = _compute_distance_image(self.exclusion_mask) |
||
0 ignored issues
–
show
|
|||
195 | |||
196 | def find_regions(self): |
||
197 | """Find reflected regions.""" |
||
198 | curr_angle = self._angle + self._min_ang + self.min_distance_input |
||
199 | reflected_regions = [] |
||
200 | while curr_angle < self._max_angle: |
||
201 | test_pos = self._compute_xy(self._pix_center, self._offset, curr_angle) |
||
202 | test_reg = CirclePixelRegion(test_pos, self._pix_region.radius) |
||
203 | if not self._is_inside_exclusion(test_reg, self._distance_image): |
||
204 | refl_region = test_reg.to_sky(self.exclusion_mask.geom.wcs) |
||
205 | log.debug("Placing reflected region\n{}".format(refl_region)) |
||
0 ignored issues
–
show
|
|||
206 | reflected_regions.append(refl_region) |
||
207 | curr_angle = curr_angle + self._min_ang |
||
208 | if self.max_region_number <= len(reflected_regions): |
||
209 | break |
||
210 | else: |
||
211 | curr_angle = curr_angle + self.angle_increment |
||
212 | |||
213 | log.debug("Found {} reflected regions".format(len(reflected_regions))) |
||
0 ignored issues
–
show
|
|||
214 | self.reflected_regions = reflected_regions |
||
215 | |||
216 | def plot(self, fig=None, ax=None): |
||
217 | """Standard debug plot. |
||
218 | |||
219 | See example here: :ref:'regions_reflected'. |
||
220 | """ |
||
221 | fig, ax, cbar = self.exclusion_mask.plot(fig=fig, ax=ax, cmap="gray") |
||
0 ignored issues
–
show
|
|||
222 | wcs = self.exclusion_mask.geom.wcs |
||
223 | on_patch = self.region.to_pixel(wcs=wcs).as_artist(color="red", alpha=0.6) |
||
224 | ax.add_patch(on_patch) |
||
225 | |||
226 | for off in self.reflected_regions: |
||
227 | tmp = off.to_pixel(wcs=wcs) |
||
228 | off_patch = tmp.as_artist(color="blue", alpha=0.6) |
||
229 | ax.add_patch(off_patch) |
||
230 | |||
231 | test_pointing = self.center |
||
232 | ax.scatter( |
||
233 | test_pointing.galactic.l.degree, |
||
234 | test_pointing.galactic.b.degree, |
||
235 | transform=ax.get_transform("galactic"), |
||
236 | marker="+", |
||
237 | s=300, |
||
238 | linewidths=3, |
||
239 | color="green", |
||
240 | ) |
||
241 | |||
242 | return fig, ax |
||
243 | |||
244 | @staticmethod |
||
245 | def _is_inside_exclusion(pixreg, distance_image): |
||
246 | """Test if a `~regions.PixRegion` overlaps with an exclusion mask. |
||
247 | |||
248 | If the regions is outside the exclusion mask, return 'False' |
||
249 | """ |
||
250 | x, y = pixreg.center.x, pixreg.center.y |
||
251 | try: |
||
252 | val = distance_image.data[np.round(y).astype(int), np.round(x).astype(int)] |
||
253 | except IndexError: |
||
254 | return False |
||
255 | else: |
||
256 | return val < pixreg.radius |
||
257 | |||
258 | @staticmethod |
||
259 | def _compute_xy(pix_center, offset, angle): |
||
260 | """Compute x, y position for a given position angle and offset. |
||
261 | |||
262 | # TODO: replace by calculation using `astropy.coordinates` |
||
0 ignored issues
–
show
|
|||
263 | """ |
||
264 | dx = offset * np.sin(angle) |
||
265 | dy = offset * np.cos(angle) |
||
266 | x = pix_center.x + dx |
||
267 | y = pix_center.y + dy |
||
268 | return PixCoord(x=x, y=y) |
||
269 | |||
270 | |||
271 | class ReflectedRegionsBackgroundEstimator(object): |
||
0 ignored issues
–
show
|
|||
272 | """Reflected Regions background estimator. |
||
273 | |||
274 | This class is responsible for creating a |
||
275 | `~gammapy.background.BackgroundEstimate` by placing reflected regions given |
||
276 | a target region and an observation. |
||
277 | |||
278 | For a usage example see :gp-extra-notebook:`spectrum_analysis` |
||
279 | |||
280 | Parameters |
||
281 | ---------- |
||
282 | on_region : `~regions.CircleSkyRegion` |
||
283 | Target region |
||
284 | observations : `~gammapy.data.Observations` |
||
285 | Observations to process |
||
286 | kwargs : dict |
||
287 | Forwarded to `gammapy.background.ReflectedRegionsFinder` |
||
288 | """ |
||
289 | |||
290 | def __init__(self, on_region, observations, **kwargs): |
||
291 | self.on_region = on_region |
||
292 | self.observations = observations |
||
293 | self.finder = ReflectedRegionsFinder(region=on_region, center=None, **kwargs) |
||
294 | |||
295 | self.result = None |
||
296 | |||
297 | def __str__(self): |
||
298 | s = self.__class__.__name__ |
||
299 | s += "\n{}".format(self.on_region) |
||
300 | s += "\n{}".format(self.observations) |
||
301 | s += "\n{}".format(self.finder) |
||
302 | return s |
||
303 | |||
304 | def run(self): |
||
305 | """Run all steps.""" |
||
306 | log.debug("Computing reflected regions") |
||
307 | result = [] |
||
308 | for obs in self.observations: |
||
309 | temp = self.process(obs=obs) |
||
310 | result.append(temp) |
||
311 | |||
312 | self.result = result |
||
313 | |||
314 | def process(self, obs): |
||
315 | """Estimate background for one observation.""" |
||
316 | log.debug("Processing observation {}".format(obs)) |
||
0 ignored issues
–
show
|
|||
317 | self.finder.center = obs.pointing_radec |
||
318 | self.finder.run() |
||
319 | off_region = self.finder.reflected_regions |
||
320 | off_events = obs.events.select_circular_region(off_region) |
||
321 | on_events = obs.events.select_circular_region(self.on_region) |
||
322 | a_on = 1 |
||
323 | a_off = len(off_region) |
||
324 | return BackgroundEstimate( |
||
325 | on_region=self.on_region, |
||
326 | on_events=on_events, |
||
327 | off_region=off_region, |
||
328 | off_events=off_events, |
||
329 | a_on=a_on, |
||
330 | a_off=a_off, |
||
331 | method="Reflected Regions", |
||
332 | ) |
||
333 | |||
334 | def plot(self, fig=None, ax=None, cmap=None, idx=None, add_legend=False): |
||
0 ignored issues
–
show
|
|||
335 | """Standard debug plot. |
||
336 | |||
337 | Parameters |
||
338 | ---------- |
||
339 | cmap : `~matplotlib.colors.ListedColormap`, optional |
||
340 | Color map to use |
||
341 | idx : int, optional |
||
342 | Observations to include in the plot, default: all |
||
343 | add_legend : boolean, optional |
||
344 | Enable/disable legend in the plot, default: False |
||
345 | """ |
||
346 | import matplotlib.pyplot as plt |
||
0 ignored issues
–
show
|
|||
347 | |||
348 | fig, ax, cbar = self.finder.exclusion_mask.plot(fig=fig, ax=ax) |
||
349 | |||
350 | wcs = self.finder.exclusion_mask.geom.wcs |
||
351 | on_patch = self.on_region.to_pixel(wcs=wcs).as_artist(color="red") |
||
352 | ax.add_patch(on_patch) |
||
353 | |||
354 | result = self.result |
||
355 | obs_list = list(self.observations) |
||
356 | if idx is not None: |
||
357 | obs_list = np.asarray(self.observations)[idx] |
||
358 | obs_list = np.atleast_1d(obs_list) |
||
359 | result = np.asarray(self.result)[idx] |
||
360 | result = np.atleast_1d(result) |
||
361 | |||
362 | cmap = cmap or plt.get_cmap("viridis") |
||
363 | colors = cmap(np.linspace(0, 1, len(self.observations))) |
||
364 | |||
365 | handles = [] |
||
366 | for idx_ in np.arange(len(obs_list)): |
||
367 | obs = obs_list[idx_] |
||
368 | |||
369 | off_regions = result[idx_].off_region |
||
370 | for off in off_regions: |
||
371 | off_patch = off.to_pixel(wcs=wcs).as_artist( |
||
372 | alpha=0.8, color=colors[idx_], label="Obs {}".format(obs.obs_id) |
||
373 | ) |
||
374 | handle = ax.add_patch(off_patch) |
||
375 | if off_regions: |
||
376 | handles.append(handle) |
||
377 | |||
378 | test_pointing = obs.pointing_radec.galactic |
||
379 | ax.scatter( |
||
380 | test_pointing.l.degree, |
||
381 | test_pointing.b.degree, |
||
382 | transform=ax.get_transform("galactic"), |
||
383 | marker="+", |
||
384 | color=colors[idx_], |
||
385 | s=300, |
||
386 | linewidths=3, |
||
387 | ) |
||
388 | |||
389 | if add_legend: |
||
390 | ax.legend(handles=handles) |
||
391 | |||
392 | return fig, ax, cbar |
||
393 |