Passed
Push — master ( 20ef60...3972dc )
by Giacomo
10:31
created

hawc_hal.response.response.HAWCResponse.write()   B

Complexity

Conditions 5

Size

Total Lines 46
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 46
rs 8.8373
c 0
b 0
f 0
cc 5
nop 2
1
import numpy as np
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
introduced by
Unable to import 'numpy'
Loading history...
Unused Code introduced by
Unused numpy imported as np
Loading history...
2
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
3
import os
0 ignored issues
show
introduced by
standard import "import os" should be placed before "import numpy as np"
Loading history...
4
import collections
0 ignored issues
show
introduced by
standard import "import collections" should be placed before "import numpy as np"
Loading history...
5
6
from ..serialize import Serialization
7
8
from threeML.io.file_utils import file_existing_and_readable, sanitize_filename
0 ignored issues
show
introduced by
Unable to import 'threeML.io.file_utils'
Loading history...
introduced by
first party import "from threeML.io.file_utils import file_existing_and_readable, sanitize_filename" should be placed before "from ..serialize import Serialization"
Loading history...
9
from threeML.exceptions.custom_exceptions import custom_warnings
0 ignored issues
show
introduced by
Unable to import 'threeML.exceptions.custom_exceptions'
Loading history...
introduced by
first party import "from threeML.exceptions.custom_exceptions import custom_warnings" should be placed before "from ..serialize import Serialization"
Loading history...
10
11
from ..psf_fast import PSFWrapper
12
from response_bin import ResponseBin
0 ignored issues
show
Coding Style introduced by
Relative import 'response_bin', should be 'hawc_hal.response.response_bin'
Loading history...
introduced by
first party import "from response_bin import ResponseBin" should be placed before "from ..serialize import Serialization"
Loading history...
13
14
15
_instances = {}
0 ignored issues
show
Coding Style Naming introduced by
Constant name "_instances" doesn't conform to UPPER_CASE naming style ('(([A-Z_][A-Z0-9_]*)|(__.*__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
16
17
18
def hawc_response_factory(response_file_name):
19
    """
20
    A factory function for the response which keeps a cache, so that the same response is not read over and
21
    over again.
22
23
    :param response_file_name:
24
    :return: an instance of HAWCResponse
25
    """
26
27
    response_file_name = sanitize_filename(response_file_name, abspath=True)
28
29
    # See if this response is in the cache, if not build it
30
31
    if not response_file_name in _instances:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _instances does not seem to be defined.
Loading history...
32
33
        print("Creating singleton for %s" % response_file_name)
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after print.
Loading history...
34
35
        # Use the extension of the file to figure out which kind of response it is (ROOT or HDF)
36
37
        extension = os.path.splitext(response_file_name)[-1]
38
39
        if extension == ".root":
40
41
            new_instance = HAWCResponse.from_root_file(response_file_name)
42
43
        elif extension in ['.hd5', '.hdf5']:
44
45
            new_instance = HAWCResponse.from_hdf5(response_file_name)
46
47
        else:  # pragma: no cover
48
49
            raise NotImplementedError("Extension %s for response file %s not recognized." % (extension,
50
                                                                                             response_file_name))
51
52
        _instances[response_file_name] = new_instance
53
54
    # return the response, whether it was already in the cache or we just built it
55
56
    return _instances[response_file_name]  # type: HAWCResponse
57
58
59
class HAWCResponse(object):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
60
61
    def __init__(self, response_file_name, dec_bins, response_bins):
62
63
        self._response_file_name = response_file_name
64
        self._dec_bins = dec_bins
65
        self._response_bins = response_bins
66
67
    @classmethod
68
    def from_hdf5(cls, response_file_name):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (31/15).
Loading history...
69
70
        response_bins = collections.OrderedDict()
71
72
        with Serialization(response_file_name, mode='r') as serializer:
73
74
            meta_dfs, _ = serializer.retrieve_pandas_object('/dec_bins_definition')
75
            effarea_dfs, _ = serializer.retrieve_pandas_object('/effective_area')
76
            psf_dfs, _ = serializer.retrieve_pandas_object('/psf')
77
78
        declination_centers = effarea_dfs.index.levels[0]
79
        energy_bins = effarea_dfs.index.levels[1]
80
81
        min_decs = []
82
        max_decs = []
83
84
        for dec_center in declination_centers:
85
86
            these_response_bins = collections.OrderedDict()
87
88
            for i, energy_bin in enumerate(energy_bins):
89
90
                these_meta = meta_dfs.loc[dec_center, energy_bin]
91
92
                min_dec = these_meta['min_dec']
93
                max_dec = these_meta['max_dec']
94
                dec_center_ = these_meta['declination_center']
95
96
                assert dec_center_ == dec_center, "Response is corrupted"
97
98
                # If this is the first energy bin, let's store the minimum and maximum dec of this bin
99
                if i == 0:
100
101
                    min_decs.append(min_dec)
102
                    max_decs.append(max_dec)
103
104
                else:
105
106
                    # Check that the minimum and maximum declination for this bin are the same as for
107
                    # the first energy bin
108
                    assert min_dec == min_decs[-1], "Response is corrupted"
109
                    assert max_dec == max_decs[-1], "Response is corrupted"
110
111
                sim_n_sig_events = these_meta['n_sim_signal_events']
112
                sim_n_bg_events = these_meta['n_sim_bkg_events']
113
114
                this_effarea_df = effarea_dfs.loc[dec_center, energy_bin]
115
116
                sim_energy_bin_low = this_effarea_df.loc[:, 'sim_energy_bin_low'].values
117
                sim_energy_bin_centers = this_effarea_df.loc[:, 'sim_energy_bin_centers'].values
118
                sim_energy_bin_hi = this_effarea_df.loc[:, 'sim_energy_bin_hi'].values
119
                sim_differential_photon_fluxes = this_effarea_df.loc[:, 'sim_differential_photon_fluxes'].values
120
                sim_signal_events_per_bin = this_effarea_df.loc[:, 'sim_signal_events_per_bin'].values
121
122
                this_psf = PSFWrapper.from_pandas(psf_dfs.loc[dec_center, energy_bin])
123
124
                this_response_bin = ResponseBin(energy_bin, min_dec, max_dec, dec_center,
125
                                                sim_n_sig_events, sim_n_bg_events,
126
                                                sim_energy_bin_low,
127
                                                sim_energy_bin_centers,
128
                                                sim_energy_bin_hi,
129
                                                sim_differential_photon_fluxes,
130
                                                sim_signal_events_per_bin,
131
                                                this_psf)
132
133
                these_response_bins[energy_bin] = this_response_bin
134
135
            # Store the response bins for this declination bin
136
137
            response_bins[dec_center] = these_response_bins
138
139
        dec_bins = zip(min_decs, declination_centers, max_decs)
140
141
        return cls(response_file_name, dec_bins, response_bins)
142
143
    @classmethod
144
    def from_root_file(cls, response_file_name):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (25/15).
Loading history...
145
146
        from ..root_handler import open_ROOT_file, get_list_of_keys, tree_to_ndarray
147
148
        # Make sure file is readable
149
150
        response_file_name = sanitize_filename(response_file_name)
151
152
        # Check that they exists and can be read
153
154
        if not file_existing_and_readable(response_file_name):  # pragma: no cover
155
            raise IOError("Response %s does not exist or is not readable" % response_file_name)
156
157
        # Read response
158
159
        with open_ROOT_file(response_file_name) as f:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "f" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
160
161
            # Get the name of the trees
162
            object_names = get_list_of_keys(f)
163
164
            # Make sure we have all the things we need
165
166
            assert 'LogLogSpectrum' in object_names
167
            assert 'DecBins' in object_names
168
            assert 'AnalysisBins' in object_names
169
170
            # Read spectrum used during the simulation
171
            log_log_spectrum = f.Get("LogLogSpectrum")
172
173
            # Get the analysis bins definition
174
            dec_bins_ = tree_to_ndarray(f.Get("DecBins"))
175
176
            dec_bins_lower_edge = dec_bins_['lowerEdge']  # type: np.ndarray
177
            dec_bins_upper_edge = dec_bins_['upperEdge']  # type: np.ndarray
178
            dec_bins_center = dec_bins_['simdec']  # type: np.ndarray
179
180
            dec_bins = zip(dec_bins_lower_edge, dec_bins_center, dec_bins_upper_edge)
181
182
            # Read in the ids of the response bins ("analysis bins" in LiFF jargon)
183
            try:
184
185
                response_bins_ids = tree_to_ndarray(f.Get("AnalysisBins"), "name")  # type: np.ndarray
186
187
            except ValueError:
188
189
                try:
190
                
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
191
                    response_bins_ids = tree_to_ndarray(f.Get("AnalysisBins"), "id")  # type: np.ndarray
192
                
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
193
                except ValueError:
194
195
                    # Some old response files (or energy responses) have no "name" branch
196
                    custom_warnings.warn("Response %s has no AnalysisBins 'id' or 'name' branch. "
197
                                     "Will try with default names" % response_file_name)
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (add 4 spaces).
Loading history...
198
199
                    response_bins_ids = None
200
            response_bins_ids = response_bins_ids.astype(str)
201
202
            # Now we create a dictionary of ResponseBin instances for each dec bin_name
203
            response_bins = collections.OrderedDict()
204
205
            for dec_id in range(len(dec_bins)):
0 ignored issues
show
unused-code introduced by
Consider using enumerate instead of iterating with range and len
Loading history...
206
207
                this_response_bins = collections.OrderedDict()
208
209
                min_dec, dec_center, max_dec = dec_bins[dec_id]
210
211
                # If we couldn't get the reponse_bins_ids above, let's use the default names
212
                if response_bins_ids is None:
213
214
                    # Default are just integers. let's read how many nHit bins are from the first dec bin
215
                    dec_id_label = "dec_%02i" % dec_id
216
217
                    n_energy_bins = f.Get(dec_id_label).GetNkeys()
218
219
                    response_bins_ids = range(n_energy_bins)
220
221
                for response_bin_id in response_bins_ids:
222
223
                    this_response_bin = ResponseBin.from_ttree(f, dec_id, response_bin_id, log_log_spectrum,
224
                                                               min_dec, dec_center, max_dec)
225
226
                    this_response_bins[response_bin_id] = this_response_bin
227
228
                response_bins[dec_bins[dec_id][1]] = this_response_bins
229
230
        # Now the file is closed. Let's explicitly remove f so we are sure it is freed
231
        del f
232
233
        # Instance the class and return it
234
        instance = cls(response_file_name, dec_bins, response_bins)
235
236
        return instance
237
238
    def get_response_dec_bin(self, dec, interpolate=False):
239
        """
240
        Get the responses for the provided declination bin, optionally interpolating the PSF
241
242
        :param dec: the declination where the response is desired at
243
        :param interpolate: whether to interpolate or not the PSF between the two closes response bins
244
        :return:
245
        """
246
247
        # Sort declination bins by distance to the provided declination
248
        dec_bins_keys = self._response_bins.keys()
249
        dec_bins_by_distance = sorted(dec_bins_keys, key=lambda x: abs(x - dec))
250
251
        if not interpolate:
252
253
            # Find the closest dec bin_name. We iterate over all the dec bins because we don't want to assume
254
            # that the bins are ordered by Dec in the file (and the operation is very cheap anyway,
255
            # since the dec bins are few)
256
257
            closest_dec_key = dec_bins_by_distance[0]
258
259
            return self._response_bins[closest_dec_key]
260
261
        else:
262
263
            # Find the two closest responses
264
            dec_bin_one, dec_bin_two = dec_bins_by_distance[:2]
265
266
            # Let's handle the special case where the requested dec is exactly on a response bin
267
            if abs(dec_bin_one - dec) < 0.01:
268
269
                # Do not interpolate
270
                return self._response_bins[dec_bin_one]
271
272
            energy_bins_one = self._response_bins[dec_bin_one]
273
            energy_bins_two = self._response_bins[dec_bin_two]
274
275
            # Now linearly interpolate between them
276
277
            # Compute the weights according to the distance to the source
278
            w1 = (dec - dec_bin_two) / (dec_bin_one - dec_bin_two)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "w1" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
279
            w2 = (dec - dec_bin_one) / (dec_bin_two - dec_bin_one)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "w2" doesn't conform to snake_case naming style ('(([a-z_][a-z0-9_]2,30)|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
280
281
            new_responses = collections.OrderedDict()
282
283
            for bin_id in energy_bins_one:
284
285
                this_new_response = energy_bins_one[bin_id].combine_with_weights(energy_bins_two[bin_id], dec, w1, w2)
286
287
                new_responses[bin_id] = this_new_response
288
289
            return new_responses
290
291
292
    @property
293
    def dec_bins(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
294
295
        return self._dec_bins
296
297
    @property
298
    def response_bins(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
299
300
        return self._response_bins
301
302
    @property
303
    def n_energy_planes(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
304
305
        return len(self._response_bins.values()[0])
306
307
    def display(self, verbose=False):
308
        """
309
        Prints summary of the current object content.
310
311
        :param verbose bool: Prints the full list of declinations and analysis bins.
312
        """
313
314
        print("Response file: %s" % self._response_file_name)
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after print.
Loading history...
315
        print("Number of dec bins: %s" % len(self._dec_bins))
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after print.
Loading history...
316
        if verbose:
317
            print self._dec_bins
318
        print("Number of energy/nHit planes per dec bin_name: %s" % (self.n_energy_planes))
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after print.
Loading history...
319
        if verbose:
320
            print self._response_bins.values()[0].keys()
321
322
    def write(self, filename):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (19/15).
Loading history...
323
        """
324
        Write the response to HDF5.
325
326
        :param filename: output file. WARNING: it will be overwritten if existing.
327
        :return:
328
        """
329
330
        filename = sanitize_filename(filename)
331
332
        # Unravel the dec bins
333
        min_decs, center_decs, max_decs = zip(*self._dec_bins)
0 ignored issues
show
Unused Code introduced by
The variable max_decs seems to be unused.
Loading history...
Unused Code introduced by
The variable min_decs seems to be unused.
Loading history...
334
335
        # We get the definition of the response bins, as well as their coordinates (the dec center) and store them
336
        # in lists. Later on we will use these to make 3 dataframes containing all the needed data
337
        multi_index_keys = []
338
        effarea_dfs = []
339
        psf_dfs = []
340
        all_metas = []
341
342
        # Loop over all the dec bins (making sure that they are in order)
343
        for dec_center in sorted(center_decs):
344
345
            for bin_id in self._response_bins[dec_center]:
346
347
                response_bin = self._response_bins[dec_center][bin_id]
348
                this_effarea_df, this_meta, this_psf_df = response_bin.to_pandas()
349
350
                effarea_dfs.append(this_effarea_df)
351
                psf_dfs.append(this_psf_df)
352
                assert bin_id == response_bin.name, \
353
                    'Bin name inconsistency: {} != {}'.format(bin_id, response_bin.name)
354
                multi_index_keys.append((dec_center, response_bin.name))
355
                all_metas.append(pd.Series(this_meta))
356
357
        # Create the dataframe with all the effective areas (with a multi-index)
358
        effarea_df = pd.concat(effarea_dfs, axis=0, keys=multi_index_keys)
359
        psf_df = pd.concat(psf_dfs, axis=0, keys=multi_index_keys)
360
        meta_df = pd.concat(all_metas, axis=1, keys=multi_index_keys).T
361
362
        # Now write the 4 dataframes to file
363
        with Serialization(filename, mode='w') as serializer:
364
365
            serializer.store_pandas_object('/dec_bins_definition', meta_df)
366
            serializer.store_pandas_object('/effective_area', effarea_df)
367
            serializer.store_pandas_object('/psf', psf_df)
368