ModulationCurveFile.__init__()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 88
Code Lines 45

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 45
nop 11
dl 0
loc 88
rs 7.8666
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import h5py
2
import numpy as np
3
4
from threeML.utils.polarization.binned_polarization import BinnedModulationCurve
5
6
7
class ModulationCurveFile(object):
8
9
    def __init__(self,
10
                 counts,
11
                 scattering_bins,
12
                 exposures,
13
                 count_errors=None,
14
                 sys_errors=None,
15
                 scale_factor=1.,
16
                 mission=None,
17
                 instrument=None,
18
                 tstart=None,
19
                 tstop=None):
20
        """
21
22
        A interface for the  modulation curve to facilitate  the reading and writing of files
23
24
        :param counts: a matrix of counts where the rows are time intervals and the columns are scattering bins
25
        :param scattering_bins: and array of scattering bin edges
26
        :param exposures: and array of exposures for each time bin
27
        :param count_errors: a matrix of counts where the rows are time intervals and the columns are scattering bins
28
        :param sys_errors: a matrix of counts where the rows are time intervals and the columns are scattering bins
29
        :param scale_factor: the scale factor of the background
30
        :param mission: the space mission
31
        :param instrument: the instrument on the mission
32
        :param tstart: an array of start times for the intervals
33
        :param tstop: an array of stop times for the intervals
34
        """
35
36
        # make sure that all the arrays are the correct shape
37
38
        counts = np.atleast_2d(counts)
39
        exposures = np.atleast_1d(exposures)
40
41
        assert len(exposures.shape) == 1
42
43
        assert len(counts.shape) == 2
44
45
        # extract the shape so we know the interval size
46
        # as well as the number of bins
47
48
        n_intervals, n_bins = counts.shape
49
50
        assert len(exposures) == n_intervals
51
52
        assert len(scattering_bins) == n_bins + 1, 'The shape of the counts is incorrect'
53
54
        assert np.all(exposures >= 0.), 'exposures are not positive'
55
56
        if count_errors is not None:
57
58
            self._is_poisson = False
59
            count_errors = np.atleast_2d(count_errors)
60
61
            assert np.all(count_errors.shape == counts.shape)
62
63
        else:
64
65
            self._is_poisson = True
66
67
        self._count_errors = count_errors
68
69
        # correct start and stops
70
        tmp = [tstart, tstop]
71
72
        for i, _ in enumerate(tmp):
73
74
            if tmp[i] is not None:
75
                tmp[i] = np.atleast_1d(tmp[i])
76
                assert len(tmp[i].shape) == 1
77
                assert len(tmp[i]) == n_intervals
78
79
        ## fix this later
80
        self._sys_errors = sys_errors
81
82
        if instrument is None:
83
            instrument = 'Unknown'
84
85
        if mission is None:
86
            mission = 'Unknown'
87
88
        self._counts = counts
89
        self._scattering_bins = scattering_bins
90
        self._exposures = exposures
91
        self._n_intervals = n_intervals
92
        self._scale_factor = scale_factor
93
        self._tstart = tstart
94
        self._tstop = tstop
95
        self._instrument = instrument
96
        self._mission = mission
97
98
    @classmethod
99
    def read(cls, file_name):
100
101
        with h5py.File(file_name, 'r') as f:
102
103
            n_intervals = int(f.attrs['n_intervals'])
104
            is_poisson = bool(f.attrs['is_poisson'])
105
            scattering_bins = f['scattering_bins'].value
106
            mission = f.attrs['mission']
107
            instrument = f.attrs['instrument']
108
            scale_factor = f.attrs['scale_factor']
109
110
            counts = []
111
            exposures = []
112
113
            if not is_poisson:
114
115
                count_errors = []
116
117
            else:
118
119
                count_errors = None
120
121
            tstart_flag = True
122
            tstop_flag = True
123
            tstart = []
124
            tstop = []
125
126
            for interval in range(n_intervals):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
127
                int_grp = f['interval_%d' % interval]
128
129
                counts.append(int_grp['counts'].value)
130
                exposures.append(int_grp.attrs['exposure'])
131
132
                if not is_poisson:
133
                    count_errors.append(int_grp['count_errors'].value)
134
135
                if tstart_flag:
136
                    try:
137
138
                        tstart.append(int_grp.attrs['tstart'])
139
140
                    except:
141
142
                        tstart = None
143
                        tstart_flag = False
144
145
                if tstop_flag:
146
                    try:
147
148
                        tstop.append(int_grp.attrs['tstop'])
149
150
                    except:
151
152
                        tstop = None
153
                        tstop_flag = False
154
155
        return cls(counts=counts,
156
                   exposures=exposures,
157
                   scattering_bins=scattering_bins,
158
                   count_errors=count_errors,
159
                   sys_errors=None,
160
                   scale_factor=scale_factor,
161
                   mission=mission,
162
                   instrument=instrument,
163
                   tstart=tstart,
164
                   tstop=tstop)
165
166
    def writeto(self, file_name):
167
168
        with h5py.File(file_name, 'w') as f:
169
170
            for interval in range(self._n_intervals):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
171
172
                int_grp = f.create_group('interval_%d' % interval)
173
174
                int_grp.create_dataset('counts', data=self._counts[interval], compression='lzf')
175
                int_grp.attrs['exposure'] = self._exposures[interval]
176
177
                if self._count_errors is not None:
178
                    int_grp.create_dataset('count_errors', data=self._count_errors[interval], compression='lzf')
179
180
                if self._tstart is not None:
181
                    int_grp.attrs['tstart'] = self._tstart[interval]
182
183
                if self._tstop is not None:
184
                    int_grp.attrs['tstop'] = self._tstop[interval]
185
186
            f.create_dataset('scattering_bins', data=self._scattering_bins, compression='lzf')
187
            f.attrs['n_intervals'] = self._n_intervals
188
            f.attrs['is_poisson'] = self._is_poisson
189
            f.attrs['instrument'] = self._instrument
190
            f.attrs['mission'] = self._mission
191
            f.attrs['scale_factor'] = self._scale_factor
192
193
    def to_binned_modulation_curve(self, interval=0):
194
        """
195
196
        Create a 3ML BinnedModulationCurve from the contents of the interface.
197
        An interval with c-style indexing must be specified.
198
199
        :param interval: The interval to extract. Starting at zero
200
        :return: A 3ML BinnedModulationCurve
201
        """
202
203
        # this keeps us from trying to call create things
204
        # not stored
205
206
        count_errors = None
207
        sys_errors = None
208
        if self._count_errors is not None:
209
            count_errors = self._count_errors[interval]
210
211
        if self._sys_errors is not None:
212
            sys_errors = self._sys_errors[interval]
213
214
        return BinnedModulationCurve(
215
            counts=self._counts[interval],
216
            exposure=self._exposures[interval],
217
            abounds=self._scattering_bins,
218
            count_errors=count_errors,
219
            sys_errors=sys_errors,
220
            scale_factor=self._scale_factor,
221
            is_poisson=self._is_poisson,
222
            instrument=self._instrument,
223
            tstart=self._tstart,
224
            tstop=self._tstop)
225
226
    @classmethod
227
    def from_binned_modulation_curve(cls, binned_mod_curve):
228
        """
229
        Create a modulation curve file from a binned modulation curve
230
        instance. This is really a simple pass thru for writing to
231
        disk
232
        :param binned_mod_curve: a 3ML BinnedModulationCurve
233
        :return: a modulation curve fit file memory instance
234
235
        """
236
        return cls(counts=binned_mod_curve.counts,
237
                   exposures=binned_mod_curve.exposure,
238
                   scattering_bins=binned_mod_curve.edges,
239
                   count_errors=binned_mod_curve.count_errors,
240
                   sys_errors=binned_mod_curve.sys_errors,
241
                   scale_factor=binned_mod_curve.scale_factor,
242
                   mission=binned_mod_curve.mission,
243
                   instrument=binned_mod_curve.instrument,
244
                   tstart=binned_mod_curve.tstart,
245
                   tstop=binned_mod_curve.tstop)
246
247
    @classmethod
248
    def from_list_of_plugins(cls, plugins, kind='total'):
249
250
        assert (kind == 'total') or (kind == 'background'), 'invalid kind. Must be totla or background'
251
252
        out = dict(
253
            tstart=[],
254
            tstop=[],
255
            counts=[],
256
            count_errors=[],
257
            sys_errors=[],
258
            exposures=[],)
259
        for plugin in plugins:
260
261
            #            assert isinstance(plugin, PolarLike), 'must be a PolarLike plugin'
262
263
            if kind == 'total':
264
265
                bmc = plugin.observation
266
267
            elif kind == 'background':
268
269
                bmc = plugin.background
270
271
            out['tstart'].append(bmc.tstart)
0 ignored issues
show
introduced by
The variable bmc does not seem to be defined for all execution paths.
Loading history...
272
            out['tstop'].append(bmc.tstop)
273
            out['counts'].append(bmc.counts)
274
            out['count_errors'].append(bmc.count_errors)
275
            out['sys_errors'].append(bmc.sys_errors)
276
            out['exposures'].append(bmc.exposure)
277
278
        for k, v in out.items():
279
280
            if np.all(np.array(v) == None):
281
                out[k] = None
282
283
        return cls(counts=out['counts'],
284
                   exposures=out['exposures'],
285
                   scattering_bins=bmc.edges,
286
                   count_errors=out['count_errors'],
287
                   sys_errors=out['sys_errors'],
288
                   scale_factor=bmc.scale_factor,
289
                   mission=bmc.mission,
290
                   instrument=bmc.instrument,
291
                   tstart=out['tstart'],
292
                   tstop=out['tstop'])
293
294
    @property
295
    def counts(self):
296
        return self._counts
297
298
    @property
299
    def scattering_bins(self):
300
        return self._scattering_bins
301
302
    @property
303
    def exposures(self):
304
        return self._exposures
305