Passed
Pull Request — master (#2033)
by
unknown
02:19
created

gammapy.data.observations   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 444
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 217
dl 0
loc 444
rs 3.6
c 0
b 0
f 0
wmc 60

39 Methods

Rating   Name   Duplication   Size   Complexity  
A DataStoreObservation.bkg() 0 4 1
A DataStoreObservation.observation_live_time_duration() 0 10 1
A DataStoreObservation.pointing_altaz() 0 5 1
A DataStoreObservation.edisp() 0 4 1
A DataStoreObservation.psf() 0 4 1
A DataStoreObservation.observation_dead_time_fraction() 0 15 1
A DataStoreObservation.obs_info() 0 5 1
A DataStoreObservation.events() 0 5 1
A DataStoreObservation.observation_time_duration() 0 7 1
A DataStoreObservation.load() 0 17 1
A DataStoreObservation.__init__() 0 10 3
A DataStoreObservation.gti() 0 13 2
A DataStoreObservation.pointing_radec() 0 5 1
A DataStoreObservation.location() 0 17 1
A DataStoreObservation.aeff() 0 4 1
A DataStoreObservation.pointing_zen() 0 4 1
A DataStoreObservation.tstart() 0 7 1
A DataStoreObservation.__str__() 0 14 1
A DataStoreObservation.tstop() 0 7 1
A ObservationChecker._record() 0 2 1
A DataStoreObservation.select_time() 0 19 1
A Observations.select_time() 0 21 3
A ObservationChecker.__init__() 0 2 1
A ObservationChecker.check_events() 0 10 2
A ObservationChecker.check_edisp() 0 12 3
A Observations.__init__() 0 2 1
A DataStoreObservation.check() 0 7 1
A DataStoreObservation.peek() 0 3 1
A ObservationChecker._check_times() 0 20 3
A Observations.__getitem__() 0 2 1
A Observations.__len__() 0 2 1
A ObservationChecker.check_gti() 0 17 5
B ObservationChecker.check_aeff() 0 23 6
A DataStoreObservation.fixed_pointing_info() 0 4 1
A DataStoreObservation.target_radec() 0 5 1
A DataStoreObservation.observatory_earth_location() 0 4 1
A ObservationChecker.check_psf() 0 8 2
A DataStoreObservation.muoneff() 0 4 1
A Observations.__str__() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like gammapy.data.observations often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
import logging
3
import numpy as np
4
from collections import OrderedDict
5
from astropy.coordinates import SkyCoord
6
from astropy.units import Quantity
7
from astropy.time import Time
8
from .event_list import EventListChecker
9
from ..utils.testing import Checker
10
from ..utils.fits import earth_location_from_dict
11
from ..utils.table import table_row_to_dict
12
from ..utils.time import time_ref_from_dict
13
from .filters import ObservationFilter
14
from .pointing import FixedPointingInfo
15
16
__all__ = ["DataStoreObservation", "Observations"]
17
18
log = logging.getLogger(__name__)
19
20
21
class DataStoreObservation:
22
    """IACT data store observation.
23
24
    Parameters
25
    ----------
26
    obs_id : int
27
        Observation ID
28
    data_store : `~gammapy.data.DataStore`
29
        Data store
30
    obs_filter : `~gammapy.data.ObservationFilter`, optional
31
        Filter for the observation
32
    """
33
34
    def __init__(self, obs_id, data_store, obs_filter=None):
35
        # Assert that `obs_id` is available
36
        if obs_id not in data_store.obs_table["OBS_ID"]:
37
            raise ValueError("OBS_ID = {} not in obs index table.".format(obs_id))
38
        if obs_id not in data_store.hdu_table["OBS_ID"]:
39
            raise ValueError("OBS_ID = {} not in HDU index table.".format(obs_id))
40
41
        self.obs_id = obs_id
42
        self.data_store = data_store
43
        self.obs_filter = obs_filter or ObservationFilter()
44
45
    def __str__(self):
46
        ss = "Info for OBS_ID = {}\n".format(self.obs_id)
47
        ss += "- Start time: {:.2f}\n".format(self.tstart.mjd)
48
        ss += "- Pointing pos: RA {:.2f} / Dec {:.2f}\n".format(
49
            self.pointing_radec.ra, self.pointing_radec.dec
50
        )
51
        ss += "- Observation duration: {}\n".format(self.observation_time_duration)
52
        ss += "- Dead-time fraction: {:5.3f} %\n".format(
53
            100 * self.observation_dead_time_fraction
54
        )
55
56
        # TODO: Which target was observed?
57
        # TODO: print info about available HDUs for this observation ...
58
        return ss
59
60
    def location(self, hdu_type=None, hdu_class=None):
61
        """HDU location object.
62
63
        Parameters
64
        ----------
65
        hdu_type : str
66
            HDU type (see `~gammapy.data.HDUIndexTable.VALID_HDU_TYPE`)
67
        hdu_class : str
68
            HDU class (see `~gammapy.data.HDUIndexTable.VALID_HDU_CLASS`)
69
70
        Returns
71
        -------
72
        location : `~gammapy.data.HDULocation`
73
            HDU location
74
        """
75
        return self.data_store.hdu_table.hdu_location(
76
            obs_id=self.obs_id, hdu_type=hdu_type, hdu_class=hdu_class
77
        )
78
79
    def load(self, hdu_type=None, hdu_class=None):
80
        """Load data file as appropriate object.
81
82
        Parameters
83
        ----------
84
        hdu_type : str
85
            HDU type (see `~gammapy.data.HDUIndexTable.VALID_HDU_TYPE`)
86
        hdu_class : str
87
            HDU class (see `~gammapy.data.HDUIndexTable.VALID_HDU_CLASS`)
88
89
        Returns
90
        -------
91
        object : object
92
            Object depends on type, e.g. for `events` it's a `~gammapy.data.EventList`.
93
        """
94
        location = self.location(hdu_type=hdu_type, hdu_class=hdu_class)
95
        return location.load()
96
97
    @property
98
    def events(self):
99
        """Load `gammapy.data.EventList` object and apply the filter."""
100
        events = self.load(hdu_type="events")
101
        return self.obs_filter.filter_events(events)
102
103
    @property
104
    def gti(self):
105
        """Load `gammapy.data.GTI` object and apply the filter."""
106
        try:
107
            gti = self.load(hdu_type="gti")
108
        except IndexError:
109
            # For now we support data without GTI HDUs
110
            # TODO: if GTI becomes required, we should drop this case
111
            # CTA discussion in https://github.com/open-gamma-ray-astro/gamma-astro-data-formats/issues/20
112
            # Added in Gammapy in https://github.com/gammapy/gammapy/pull/1908
113
            gti = self.data_store.obs_table.create_gti(obs_id=self.obs_id)
114
115
        return self.obs_filter.filter_gti(gti)
116
117
    @property
118
    def aeff(self):
119
        """Load effective area object."""
120
        return self.load(hdu_type="aeff")
121
122
    @property
123
    def edisp(self):
124
        """Load energy dispersion object."""
125
        return self.load(hdu_type="edisp")
126
127
    @property
128
    def psf(self):
129
        """Load point spread function object."""
130
        return self.load(hdu_type="psf")
131
132
    @property
133
    def bkg(self):
134
        """Load background object."""
135
        return self.load(hdu_type="bkg")
136
137
    @property
138
    def obs_info(self):
139
        """Observation information (`~collections.OrderedDict`)."""
140
        row = self.data_store.obs_table.select_obs_id(obs_id=self.obs_id)[0]
141
        return table_row_to_dict(row)
142
143
    @property
144
    def tstart(self):
145
        """Observation start time (`~astropy.time.Time`)."""
146
        met_ref = time_ref_from_dict(self.data_store.obs_table.meta)
147
        met = Quantity(self.obs_info["TSTART"].astype("float64"), "second")
148
        time = met_ref + met
149
        return time
150
151
    @property
152
    def tstop(self):
153
        """Observation stop time (`~astropy.time.Time`)."""
154
        met_ref = time_ref_from_dict(self.data_store.obs_table.meta)
155
        met = Quantity(self.obs_info["TSTOP"].astype("float64"), "second")
156
        time = met_ref + met
157
        return time
158
159
    @property
160
    def observation_time_duration(self):
161
        """Observation time duration in seconds (`~astropy.units.Quantity`).
162
163
        The wall time, including dead-time.
164
        """
165
        return self.gti.time_sum
166
167
    @property
168
    def observation_live_time_duration(self):
169
        """Live-time duration in seconds (`~astropy.units.Quantity`).
170
171
        The dead-time-corrected observation time.
172
173
        Computed as ``t_live = t_observation * (1 - f_dead)``
174
        where ``f_dead`` is the dead-time fraction.
175
        """
176
        return self.gti.time_sum * (1 - self.observation_dead_time_fraction)
177
178
    @property
179
    def observation_dead_time_fraction(self):
180
        """Dead-time fraction (float).
181
182
        Defined as dead-time over observation time.
183
184
        Dead-time is defined as the time during the observation
185
        where the detector didn't record events:
186
        https://en.wikipedia.org/wiki/Dead_time
187
        https://adsabs.harvard.edu/abs/2004APh....22..285F
188
189
        The dead-time fraction is used in the live-time computation,
190
        which in turn is used in the exposure and flux computation.
191
        """
192
        return 1 - self.obs_info["DEADC"]
193
194
    @property
195
    def pointing_radec(self):
196
        """Pointing RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""
197
        lon, lat = self.obs_info["RA_PNT"], self.obs_info["DEC_PNT"]
198
        return SkyCoord(lon, lat, unit="deg", frame="icrs")
199
200
    @property
201
    def pointing_altaz(self):
202
        """Pointing ALT / AZ sky coordinates (`~astropy.coordinates.SkyCoord`)."""
203
        alt, az = self.obs_info["ALT_PNT"], self.obs_info["AZ_PNT"]
204
        return SkyCoord(az, alt, unit="deg", frame="altaz")
205
206
    @property
207
    def pointing_zen(self):
208
        """Pointing zenith angle sky (`~astropy.units.Quantity`)."""
209
        return Quantity(self.obs_info["ZEN_PNT"], unit="deg")
210
211
    @property
212
    def fixed_pointing_info(self):
213
        """Fixed pointing info for this observation (`FixedPointingInfo`)."""
214
        return FixedPointingInfo(self.events.table.meta)
215
216
    @property
217
    def target_radec(self):
218
        """Target RA / DEC sky coordinates (`~astropy.coordinates.SkyCoord`)."""
219
        lon, lat = self.obs_info["RA_OBJ"], self.obs_info["DEC_OBJ"]
220
        return SkyCoord(lon, lat, unit="deg", frame="icrs")
221
222
    @property
223
    def observatory_earth_location(self):
224
        """Observatory location (`~astropy.coordinates.EarthLocation`)."""
225
        return earth_location_from_dict(self.obs_info)
226
227
    @property
228
    def muoneff(self):
229
        """Observation muon efficiency."""
230
        return self.obs_info["MUONEFF"]
231
232
    def peek(self):
233
        """Quick-look plots in a few panels."""
234
        raise NotImplementedError
235
236
    def select_time(self, time_interval):
237
        """Select a time interval of the observation.
238
239
        Parameters
240
        ----------
241
        time_interval : `astropy.time.Time`
242
            Start and stop time of the selected time interval.
243
            For now we only support a single time interval.
244
245
        Returns
246
        -------
247
        new_obs : `~gammapy.data.DataStoreObservation`
248
            A new observation instance of the specified time interval
249
        """
250
        new_obs_filter = self.obs_filter.copy()
251
        new_obs_filter.time_filter = time_interval
252
253
        return self.__class__(
254
            obs_id=self.obs_id, data_store=self.data_store, obs_filter=new_obs_filter
255
        )
256
257
    def check(self, checks="all"):
258
        """Run checks.
259
260
        This is a generator that yields a list of dicts.
261
        """
262
        checker = ObservationChecker(self)
263
        return checker.run(checks=checks)
264
265
266
class Observations:
267
    """Container class that holds a list of observations.
268
269
    Parameters
270
    ----------
271
    obs_list : list
272
        A list of `~gammapy.data.DataStoreObservation`
273
    """
274
275
    def __init__(self, obs_list=None):
276
        self.list = obs_list or []
277
278
    def __getitem__(self, key):
279
        return self.list[key]
280
281
    def __len__(self):
282
        return len(self.list)
283
284
    def __str__(self):
285
        s = self.__class__.__name__ + "\n"
286
        s += "Number of observations: {}\n".format(len(self))
287
        for obs in self:
288
            s += str(obs)
289
        return s
290
291
    def select_time(self, time_interval):
292
        """Select a time interval of the observations.
293
294
        Parameters
295
        ----------
296
        time_interval : `astropy.time.Time`
297
            Start and stop time of the selected time interval.
298
            For now we only support a single time interval.
299
300
        Returns
301
        -------
302
        new_observations : `~gammapy.data.Observations`
303
            A new observations instance of the specified time interval
304
        """
305
        new_obs_list = []
306
        for obs in self:
307
            new_obs = obs.select_time(time_interval)
308
            if len(new_obs.gti.table) > 0:
309
                new_obs_list.append(new_obs)
310
311
        return self.__class__(new_obs_list)
312
313
314
class ObservationChecker(Checker):
315
    """Check an observation.
316
317
    Checks data format and a bit about the content.
318
    """
319
320
    CHECKS = {
321
        "events": "check_events",
322
        "gti": "check_gti",
323
        "aeff": "check_aeff",
324
        "edisp": "check_edisp",
325
        "psf": "check_psf",
326
    }
327
328
    def __init__(self, observation):
329
        self.observation = observation
330
331
    def _record(self, level="info", msg=None):
332
        return {"level": level, "obs_id": self.observation.obs_id, "msg": msg}
333
334
    def check_events(self):
335
        yield self._record(level="debug", msg="Starting events check")
336
337
        try:
338
            events = self.observation.load("events")
339
        except Exception:
340
            yield self._record(level="warning", msg="Loading events failed")
341
            return
342
343
        yield from EventListChecker(events).run()
344
345
    # TODO: split this out into a GTIChecker
346
    def check_gti(self):
347
        yield self._record(level="debug", msg="Starting gti check")
348
349
        try:
350
            gti = self.observation.load("gti")
351
        except Exception:
352
            yield self._record(level="warning", msg="Loading GTI failed")
353
            return
354
355
        if len(gti.table) == 0:
356
            yield self._record(level="error", msg="GTI table has zero rows")
357
358
        columns_required = ["START", "STOP"]
359
        for name in columns_required:
360
            if name not in gti.table.colnames:
361
                yield self._record(
362
                    level="error", msg="Missing table column: {!r}".format(name)
363
                )
364
365
        # TODO: Check that header keywords agree with table entries
366
        # TSTART, TSTOP, MJDREFI, MJDREFF
367
368
        # Check that START and STOP times are consecutive
369
        # times = np.ravel(self.table['START'], self.table['STOP'])
370
        # # TODO: not sure this is correct ... add test with a multi-gti table from Fermi.
371
        # if not np.all(np.diff(times) >= 0):
372
        #     yield 'GTIs are not consecutive or sorted.'
373
374
    # TODO: add reference times for all instruments and check for this
375
    # Use TELESCOP header key to check which instrument it is.
376
    def _check_times(self):
377
        """Check if various times are consistent.
378
379
        The headers and tables of the FITS EVENTS and GTI extension
380
        contain various observation and event time information.
381
        """
382
        # http://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/Time_in_ScienceTools.html
383
        # https://hess-confluence.desy.de/confluence/display/HESS/HESS+FITS+data+-+References+and+checks#HESSFITSdata-Referencesandchecks-Time
384
        telescope_met_refs = OrderedDict(
385
            FERMI=Time("2001-01-01T00:00:00"), HESS=Time("2001-01-01T00:00:00")
386
        )
387
388
        meta = self.dset.event_list.table.meta
389
        telescope = meta["TELESCOP"]
390
391
        if telescope in telescope_met_refs.keys():
392
            dt = self.time_ref - telescope_met_refs[telescope]
393
            if dt > self.accuracy["time"]:
394
                yield self._record(
395
                    level="error", msg="Reference time incorrect for telescope"
396
                )
397
398
    def check_aeff(self):
399
        yield self._record(level="debug", msg="Starting aeff check")
400
401
        try:
402
            aeff = self.observation.load("aeff")
403
        except Exception:
404
            yield self._record(level="warning", msg="Loading aeff failed")
405
            return
406
407
        # Check that thresholds are meaningful for aeff
408
        if (
409
            "LO_THRES" in aeff.meta
410
            and "HI_THRES" in aeff.meta
411
            and aeff.meta["LO_THRES"] >= aeff.meta["HI_THRES"]
412
        ):
413
            yield self._record(
414
                level="error", msg="LO_THRES >= HI_THRES in effective area meta data"
415
            )
416
417
        # Check that data isn't all null
418
        if np.max(aeff.data.data) <= 0:
419
            yield self._record(
420
                level="error", msg="maximum entry of effective area is <= 0"
421
            )
422
423
    def check_edisp(self):
424
        yield self._record(level="debug", msg="Starting edisp check")
425
426
        try:
427
            edisp = self.observation.load("edisp")
428
        except Exception:
429
            yield self._record(level="warning", msg="Loading edisp failed")
430
            return
431
432
        # Check that data isn't all null
433
        if np.max(edisp.data.data) <= 0:
434
            yield self._record(level="error", msg="maximum entry of edisp is <= 0")
435
436
    def check_psf(self):
437
        yield self._record(level="debug", msg="Starting psf check")
438
439
        try:
440
            psf = self.observation.load("psf")
441
        except Exception:
442
            yield self._record(level="warning", msg="Loading psf failed")
443
            return
444
445
        # TODO: implement some basic check
446
        # The following doesn't work, because EnergyDependentMultiGaussPSF
447
        # has no attribute `data`
448
        # Check that data isn't all null
449
        # if np.max(psf.data.data) <= 0:
450
        #     yield self._record(
451
        #         level="error", msg="maximum entry of psf is <= 0"
452
        #     )
453