ModulationCurveFile.counts()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
import h5py
2
import numpy as np
3
4
from threeML.utils.polarization.binned_polarization import BinnedModulationCurve
5
6
7
class ModulationCurveFile(object):
8
9
    def __init__(self,
10
                 counts,
11
                 scattering_bins,
12
                 exposures,
13
                 count_errors=None,
14
                 sys_errors=None,
15
                 scale_factor=1.,
16
                 mission=None,
17
                 instrument=None,
18
                 tstart=None,
19
                 tstop=None):
20
        """
21
22
        A interface for the  modulation curve to facilitate  the reading and writing of files
23
24
        :param counts: a matrix of counts where the rows are time intervals and the columns are scattering bins
25
        :param scattering_bins: and array of scattering bin edges
26
        :param exposures: and array of exposures for each time bin
27
        :param count_errors: a matrix of counts where the rows are time intervals and the columns are scattering bins
28
        :param sys_errors: a matrix of counts where the rows are time intervals and the columns are scattering bins
29
        :param scale_factor: the scale factor of the background
30
        :param mission: the space mission
31
        :param instrument: the instrument on the mission
32
        :param tstart: an array of start times for the intervals
33
        :param tstop: an array of stop times for the intervals
34
        """
35
36
        # make sure that all the arrays are the correct shape
37
38
        counts = np.atleast_2d(counts)
39
        exposures = np.atleast_1d(exposures)
40
41
        assert len(exposures.shape) == 1
42
43
        assert len(counts.shape) == 2
44
45
        # extract the shape so we know the interval size
46
        # as well as the number of bins
47
48
        n_intervals, n_bins = counts.shape
49
50
        assert len(exposures) == n_intervals
51
52
        assert len(scattering_bins) == n_bins + 1, 'The shape of the counts is incorrect'
53
54
        assert np.all(exposures >= 0.), 'exposures are not positive'
55
56
        if count_errors is not None:
57
58
            self._is_poisson = False
59
            count_errors = np.atleast_2d(count_errors)
60
61
            assert np.all(count_errors.shape == counts.shape)
62
63
        else:
64
65
            self._is_poisson = True
66
67
        self._count_errors = count_errors
68
69
        # correct start and stops
70
        tmp = [tstart, tstop]
71
72
        for i, _ in enumerate(tmp):
73
74
            if tmp[i] is not None:
75
                tmp[i] = np.atleast_1d(tmp[i])
76
                assert len(tmp[i].shape) == 1
77
                assert len(tmp[i]) == n_intervals
78
79
        ## fix this later
80
        self._sys_errors = sys_errors
81
82
        if instrument is None:
83
            instrument = 'Unknown'
84
85
        if mission is None:
86
            mission = 'Unknown'
87
88
        self._counts = counts
89
        self._scattering_bins = scattering_bins
90
        self._exposures = exposures
91
        self._n_intervals = n_intervals
92
        self._scale_factor = scale_factor
93
        self._tstart = tstart
94
        self._tstop = tstop
95
        self._instrument = instrument
96
        self._mission = mission
97
98
    @classmethod
99
    def read(cls, file_name):
100
101
        with h5py.File(file_name, 'r') as f:
102
103
            n_intervals = int(f.attrs['n_intervals'])
104
            is_poisson = bool(f.attrs['is_poisson'])
105
            scattering_bins = f['scattering_bins'].value
106
            mission = f.attrs['mission']
107
            instrument = f.attrs['instrument']
108
            scale_factor = f.attrs['scale_factor']
109
110
            counts = []
111
            exposures = []
112
113
            if not is_poisson:
114
115
                count_errors = []
116
117
            else:
118
119
                count_errors = None
120
121
            tstart_flag = True
122
            tstop_flag = True
123
            tstart = []
124
            tstop = []
125
126
            for interval in range(n_intervals):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
127
                int_grp = f['interval_%d' % interval]
128
129
                counts.append(int_grp['counts'].value)
130
                exposures.append(int_grp.attrs['exposure'])
131
132
                if not is_poisson:
133
                    count_errors.append(int_grp['count_errors'].value)
134
135
                if tstart_flag:
136
                    try:
137
138
                        tstart.append(int_grp.attrs['tstart'])
139
140
                    except:
141
142
                        tstart = None
143
                        tstart_flag = False
144
145
                if tstop_flag:
146
                    try:
147
148
                        tstop.append(int_grp.attrs['tstop'])
149
150
                    except:
151
152
                        tstop = None
153
                        tstop_flag = False
154
155
        return cls(counts=counts,
156
                   exposures=exposures,
157
                   scattering_bins=scattering_bins,
158
                   count_errors=count_errors,
159
                   sys_errors=None,
160
                   scale_factor=scale_factor,
161
                   mission=mission,
162
                   instrument=instrument,
163
                   tstart=tstart,
164
                   tstop=tstop)
165
166
    def writeto(self, file_name):
167
168
        with h5py.File(file_name, 'w') as f:
169
170
            for interval in range(self._n_intervals):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
171
172
                int_grp = f.create_group('interval_%d' % interval)
173
174
                int_grp.create_dataset('counts', data=self._counts[interval], compression='lzf')
175
                int_grp.attrs['exposure'] = self._exposures[interval]
176
177
                if self._count_errors is not None:
178
                    int_grp.create_dataset('count_errors', data=self._count_errors[interval], compression='lzf')
179
180
                if self._tstart is not None:
181
                    int_grp.attrs['tstart'] = self._tstart[interval]
182
183
                if self._tstop is not None:
184
                    int_grp.attrs['tstop'] = self._tstop[interval]
185
186
            f.create_dataset('scattering_bins', data=self._scattering_bins, compression='lzf')
187
            f.attrs['n_intervals'] = self._n_intervals
188
            f.attrs['is_poisson'] = self._is_poisson
189
            f.attrs['instrument'] = self._instrument
190
            f.attrs['mission'] = self._mission
191
            f.attrs['scale_factor'] = self._scale_factor
192
193
    def to_binned_modulation_curve(self, interval=0):
194
        """
195
196
        Create a 3ML BinnedModulationCurve from the contents of the interface.
197
        An interval with c-style indexing must be specified.
198
199
        :param interval: The interval to extract. Starting at zero
200
        :return: A 3ML BinnedModulationCurve
201
        """
202
203
        # this keeps us from trying to call create things
204
        # not stored
205
206
        count_errors = None
207
        sys_errors = None
208
        if self._count_errors is not None:
209
            count_errors = self._count_errors[interval]
210
211
        if self._sys_errors is not None:
212
            sys_errors = self._sys_errors[interval]
213
214
        return BinnedModulationCurve(
215
            counts=self._counts[interval],
216
            exposure=self._exposures[interval],
217
            abounds=self._scattering_bins,
218
            count_errors=count_errors,
219
            sys_errors=sys_errors,
220
            scale_factor=self._scale_factor,
221
            is_poisson=self._is_poisson,
222
            instrument=self._instrument,
223
            tstart=self._tstart,
224
            tstop=self._tstop)
225
226
    @classmethod
227
    def from_binned_modulation_curve(cls, binned_mod_curve):
228
        """
229
        Create a modulation curve file from a binned modulation curve
230
        instance. This is really a simple pass thru for writing to
231
        disk
232
        :param binned_mod_curve: a 3ML BinnedModulationCurve
233
        :return: a modulation curve fit file memory instance
234
235
        """
236
        return cls(counts=binned_mod_curve.counts,
237
                   exposures=binned_mod_curve.exposure,
238
                   scattering_bins=binned_mod_curve.edges,
239
                   count_errors=binned_mod_curve.count_errors,
240
                   sys_errors=binned_mod_curve.sys_errors,
241
                   scale_factor=binned_mod_curve.scale_factor,
242
                   mission=binned_mod_curve.mission,
243
                   instrument=binned_mod_curve.instrument,
244
                   tstart=binned_mod_curve.tstart,
245
                   tstop=binned_mod_curve.tstop)
246
247
    @classmethod
248
    def from_list_of_plugins(cls, plugins, kind='total'):
249
250
        assert (kind == 'total') or (kind == 'background'), 'invalid kind. Must be totla or background'
251
252
        out = dict(
253
            tstart=[],
254
            tstop=[],
255
            counts=[],
256
            count_errors=[],
257
            sys_errors=[],
258
            exposures=[],)
259
        for plugin in plugins:
260
261
            #            assert isinstance(plugin, PolarLike), 'must be a PolarLike plugin'
262
263
            if kind == 'total':
264
265
                bmc = plugin.observation
266
267
            elif kind == 'background':
268
269
                bmc = plugin.background
270
271
            out['tstart'].append(bmc.tstart)
0 ignored issues
show
introduced by
The variable bmc does not seem to be defined for all execution paths.
Loading history...
272
            out['tstop'].append(bmc.tstop)
273
            out['counts'].append(bmc.counts)
274
            out['count_errors'].append(bmc.count_errors)
275
            out['sys_errors'].append(bmc.sys_errors)
276
            out['exposures'].append(bmc.exposure)
277
278
        for k, v in out.items():
279
280
            if np.all(np.array(v) == None):
281
                out[k] = None
282
283
        return cls(counts=out['counts'],
284
                   exposures=out['exposures'],
285
                   scattering_bins=bmc.edges,
286
                   count_errors=out['count_errors'],
287
                   sys_errors=out['sys_errors'],
288
                   scale_factor=bmc.scale_factor,
289
                   mission=bmc.mission,
290
                   instrument=bmc.instrument,
291
                   tstart=out['tstart'],
292
                   tstop=out['tstop'])
293
294
    @property
295
    def counts(self):
296
        return self._counts
297
298
    @property
299
    def scattering_bins(self):
300
        return self._scattering_bins
301
302
    @property
303
    def exposures(self):
304
        return self._exposures
305