gammapy.data.data_store   F
last analyzed

Complexity

Total Complexity 78

Size/Duplication

Total Lines 719
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 339
dl 0
loc 719
rs 2.16
c 0
b 0
f 0
wmc 78

29 Methods

Rating   Name   Duplication   Size   Complexity  
A DataStore.__str__() 0 2 1
A DataStore.__init__() 0 3 1
A DataStore.obs_ids() 0 4 1
A DataStore.from_file() 0 29 2
B DataStore.from_dir() 0 60 8
B DataStore.obs() 0 64 8
B DataStore.get_observations() 0 61 6
C DataStore.copy_obs() 0 54 11
A DataStore.info() 0 14 3
A DataStore.check() 0 7 1
A CalDBIRF.from_meta() 0 3 1
A CalDBIRF.__init__() 0 4 1
A DataStoreChecker.check_observations() 0 5 2
A DataStoreChecker.__init__() 0 2 1
A DataStoreMaker.make_hdu_table() 0 18 2
A DataStoreMaker.get_hdu_table_rows() 0 30 1
B DataStoreChecker.check_hdu_table() 0 26 5
A CalDBIRF.file_path() 0 3 1
A DataStoreMaker.get_events_info() 0 7 2
A DataStoreChecker.check_consistency() 0 9 2
A DataStore.from_events_files() 0 65 1
A DataStoreMaker.get_obs_info() 0 3 1
A DataStoreMaker.make_obs_table() 0 27 2
B DataStoreMaker.read_events_info() 0 56 6
A CalDBIRF.file_name() 0 4 1
A DataStoreMaker.__init__() 0 12 4
A DataStoreChecker.check_obs_table() 0 3 1
A CalDBIRF.file_dir() 0 5 1
A DataStoreMaker.run() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like gammapy.data.data_store often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
import logging
3
import subprocess
4
from pathlib import Path
5
import numpy as np
6
from astropy import units as u
7
from astropy.coordinates import SkyCoord
8
from astropy.io import fits
9
from gammapy.utils.scripts import make_path
10
from gammapy.utils.testing import Checker
11
from .hdu_index_table import HDUIndexTable
12
from .obs_table import ObservationTable, ObservationTableChecker
13
from .observations import Observation, ObservationChecker, Observations
14
15
__all__ = ["DataStore"]
16
17
ALL_IRFS = ["aeff", "edisp", "psf", "bkg", "rad_max"]
18
ALL_HDUS = ["events", "gti"] + ALL_IRFS
19
REQUIRED_IRFS = {
20
    "full-enclosure": {"aeff", "edisp", "psf", "bkg"},
21
    "point-like": {"aeff", "edisp"},
22
    "all-optional": {},
23
}
24
25
26
class MissingRequiredHDU(IOError):
27
    pass
28
29
30
log = logging.getLogger(__name__)
31
log.setLevel(logging.INFO)
32
33
34
class DataStore:
35
    """IACT data store.
36
37
    The data selection and access happens using an observation
38
    and an HDU index file as described at :ref:`gadf:iact-storage`.
39
40
    Parameters
41
    ----------
42
    hdu_table : `~gammapy.data.HDUIndexTable`
43
        HDU index table
44
    obs_table : `~gammapy.data.ObservationTable`
45
        Observation index table
46
47
    Examples
48
    --------
49
    Here's an example how to create a `DataStore` to access H.E.S.S. data:
50
51
    >>> from gammapy.data import DataStore
52
    >>> data_store = DataStore.from_dir('$GAMMAPY_DATA/hess-dl3-dr1')
53
    >>> data_store.info() #doctest: +SKIP
54
    Data store:
55
    HDU index table:
56
    BASE_DIR: /Users/ASinha/Gammapy-dev/gammapy-data/hess-dl3-dr1
57
    Rows: 630
58
    OBS_ID: 20136 -- 47829
59
    HDU_TYPE: ['aeff', 'bkg', 'edisp', 'events', 'gti', 'psf']
60
    HDU_CLASS: ['aeff_2d', 'bkg_3d', 'edisp_2d', 'events', 'gti', 'psf_table']
61
    <BLANKLINE>
62
    <BLANKLINE>
63
    Observation table:
64
    Observatory name: 'N/A'
65
    Number of observations: 105
66
    <BLANKLINE>
67
68
    For further usage example see :doc:`/tutorials/data/cta` tutorial.
69
    """
70
71
    DEFAULT_HDU_TABLE = "hdu-index.fits.gz"
72
    """Default HDU table filename."""
73
74
    DEFAULT_OBS_TABLE = "obs-index.fits.gz"
75
    """Default observation table filename."""
76
77
    def __init__(self, hdu_table=None, obs_table=None):
78
        self.hdu_table = hdu_table
79
        self.obs_table = obs_table
80
81
    def __str__(self):
82
        return self.info(show=False)
83
84
    @property
85
    def obs_ids(self):
86
        """Return the sorted obs_ids contained in the datastore."""
87
        return np.unique(self.hdu_table["OBS_ID"].data)
88
89
    @classmethod
90
    def from_file(cls, filename, hdu_hdu="HDU_INDEX", hdu_obs="OBS_INDEX"):
91
        """Create a Datastore from a FITS file.
92
93
        The FITS file must contain both index files.
94
95
        Parameters
96
        ----------
97
        filename : str, Path
98
            FITS filename
99
        hdu_hdu : str or int
100
            FITS HDU name or number for the HDU index table
101
        hdu_obs : str or int
102
            FITS HDU name or number for the observation index table
103
104
        Returns
105
        -------
106
        data_store : `DataStore`
107
            Data store
108
        """
109
        filename = make_path(filename)
110
111
        hdu_table = HDUIndexTable.read(filename, hdu=hdu_hdu, format="fits")
112
113
        obs_table = None
114
        if hdu_obs:
115
            obs_table = ObservationTable.read(filename, hdu=hdu_obs, format="fits")
116
117
        return cls(hdu_table=hdu_table, obs_table=obs_table)
118
119
    @classmethod
120
    def from_dir(cls, base_dir, hdu_table_filename=None, obs_table_filename=None):
121
        """Create from a directory.
122
123
        Parameters
124
        ----------
125
        base_dir : str, Path
126
            Base directory of the data files.
127
        hdu_table_filename : str, Path
128
            Filename of the HDU index file. May be specified either relative
129
            to `base_dir` or as an absolute path. If None, the default filename
130
            will be looked for.
131
        obs_table_filename : str, Path
132
            Filename of the observation index file. May be specified either relative
133
            to `base_dir` or as an absolute path. If None, the default filename
134
            will be looked for.
135
136
        Returns
137
        -------
138
        data_store : `DataStore`
139
            Data store
140
141
        Examples
142
        --------
143
        >>> from gammapy.data import DataStore
144
        >>> data_store = DataStore.from_dir('$GAMMAPY_DATA/hess-dl3-dr1')
145
        """
146
147
        base_dir = make_path(base_dir)
148
149
        if hdu_table_filename:
150
            hdu_table_filename = make_path(hdu_table_filename)
151
            if (base_dir / hdu_table_filename).exists():
152
                hdu_table_filename = base_dir / hdu_table_filename
153
        else:
154
            hdu_table_filename = base_dir / cls.DEFAULT_HDU_TABLE
155
156
        if obs_table_filename:
157
            obs_table_filename = make_path(obs_table_filename)
158
            if (base_dir / obs_table_filename).exists():
159
                obs_table_filename = base_dir / obs_table_filename
160
            elif not obs_table_filename.exists():
161
                raise IOError(f"File not found : {obs_table_filename}")
162
        else:
163
            obs_table_filename = base_dir / cls.DEFAULT_OBS_TABLE
164
165
        if not hdu_table_filename.exists():
166
            raise OSError(f"File not found: {hdu_table_filename}")
167
        log.debug(f"Reading {hdu_table_filename}")
168
        hdu_table = HDUIndexTable.read(hdu_table_filename, format="fits")
169
        hdu_table.meta["BASE_DIR"] = str(base_dir)
170
171
        if not obs_table_filename.exists():
172
            log.info("Cannot find default obs-index table.")
173
            obs_table = None
174
        else:
175
            log.debug(f"Reading {obs_table_filename}")
176
            obs_table = ObservationTable.read(obs_table_filename, format="fits")
177
178
        return cls(hdu_table=hdu_table, obs_table=obs_table)
179
180
    @classmethod
181
    def from_events_files(cls, events_paths, irfs_paths=None):
182
        """Create from a list of event filenames.
183
184
        HDU and observation index tables will be created from the EVENTS header.
185
186
        IRFs are found only if you have a ``CALDB`` environment variable set,
187
        and if the EVENTS files contain the following keys:
188
189
        - ``TELESCOP`` (example: ``TELESCOP = CTA``)
190
        - ``CALDB`` (example: ``CALDB = 1dc``)
191
        - ``IRF`` (example: ``IRF = South_z20_50h``)
192
193
        This method is useful specifically if you want to load data simulated
194
        with `ctobssim`_
195
196
        .. _ctobssim: http://cta.irap.omp.eu/ctools/users/reference_manual/ctobssim.html
197
198
        Parameters
199
        ----------
200
        events_paths : list of str or Path
201
            List of paths to the events files
202
        irfs_paths : str, Path, or list of str or Path
203
            Path to the IRFs file. If a list is provided it must be the same length
204
            than `events_paths`. If None the events files have to contain CALDB and
205
            IRF header keywords to locate the IRF files, otherwise the IRFs are
206
            assumed to be contained in the events files.
207
208
        Returns
209
        -------
210
        data_store : `DataStore`
211
            Data store
212
213
        Examples
214
        --------
215
        This is how you can access a single event list::
216
217
        >>> from gammapy.data import DataStore
218
        >>> import os
219
        >>> os.environ["CALDB"] = os.environ["GAMMAPY_DATA"] + "/cta-1dc/caldb"
220
        >>> path = "$GAMMAPY_DATA/cta-1dc/data/baseline/gps/gps_baseline_110380.fits"
221
        >>> data_store = DataStore.from_events_files([path])
222
        >>> observations = data_store.get_observations()
223
224
        You can now analyse this data as usual (see any Gammapy tutorial).
225
226
        If you have multiple event files, you have to make the list. Here's an example
227
        using ``Path.glob`` to get a list of all events files in a given folder::
228
229
        >>> import os
230
        >>> from pathlib import Path
231
        >>> path = Path(os.environ["GAMMAPY_DATA"]) / "cta-1dc/data"
232
        >>> paths = list(path.rglob("*.fits"))
233
        >>> data_store = DataStore.from_events_files(paths)
234
        >>> observations = data_store.get_observations()
235
236
        >>> #Note that you have a lot of flexibility to select the observations you want,
237
        >>> # by having a few lines of custom code to prepare ``paths``, or to select a
238
        >>> # subset via a method on the ``data_store`` or the ``observations`` objects.
239
        >>> # If you want to generate HDU and observation index files, write the tables to disk::
240
241
        >>> data_store.hdu_table.write("hdu-index.fits.gz") # doctest: +SKIP
242
        >>> data_store.obs_table.write("obs-index.fits.gz") # doctest: +SKIP
243
        """
244
        return DataStoreMaker(events_paths, irfs_paths).run()
245
246
    def info(self, show=True):
247
        """Print some info."""
248
        s = "Data store:\n"
249
        s += self.hdu_table.summary()
250
        s += "\n\n"
251
        if self.obs_table:
252
            s += self.obs_table.summary()
253
        else:
254
            s += "No observation index table."
255
256
        if show:
257
            print(s)
258
        else:
259
            return s
260
261
    def obs(self, obs_id, required_irf="full-enclosure"):
262
        """Access a given `~gammapy.data.Observation`.
263
264
        Parameters
265
        ----------
266
        obs_id : int
267
            Observation ID.
268
        required_irf : list of str or str
269
            The list can include the following options:
270
271
            * `"events"` : Events
272
            * `"gti"` :  Good time intervals
273
            * `"aeff"` : Effective area
274
            * `"bkg"` : Background
275
            * `"edisp"`: Energy dispersion
276
            * `"psf"` : Point Spread Function
277
            * `"rad_max"` : Maximal radius
278
279
            Alternatively single string can be used as shortcut:
280
281
            * `"full-enclosure"` : includes `["events", "gti", "aeff", "edisp", "psf", "bkg"]`
282
            * `"point-like"` : includes `["events", "gti", "aeff", "edisp"]`
283
284
        Returns
285
        -------
286
        observation : `~gammapy.data.Observation`
287
            Observation container
288
289
        """
290
        if obs_id not in self.hdu_table["OBS_ID"]:
291
            raise ValueError(f"OBS_ID = {obs_id} not in HDU index table.")
292
293
        kwargs = {"obs_id": int(obs_id)}
294
295
        # check for the "short forms"
296
        if isinstance(required_irf, str):
297
            required_irf = REQUIRED_IRFS[required_irf]
298
299
        if not set(required_irf).issubset(ALL_IRFS):
300
            difference = set(required_irf).difference(ALL_IRFS)
301
            raise ValueError(
302
                f"{difference} is not a valid hdu key. Choose from: {ALL_IRFS}"
303
            )
304
305
        required_hdus = {"event", "gti"}.union(required_irf)
306
307
        missing_hdus = []
308
        for hdu in ALL_HDUS:
309
            hdu_location = self.hdu_table.hdu_location(
310
                obs_id=obs_id,
311
                hdu_type=hdu,
312
                warn_missing=False,
313
            )
314
            if hdu_location is not None:
315
                kwargs[hdu] = hdu_location
316
            elif hdu in required_hdus:
317
                missing_hdus.append(hdu)
318
319
        if len(missing_hdus) > 0:
320
            raise MissingRequiredHDU(
321
                f"Required HDUs {missing_hdus} not found in observation {obs_id}"
322
            )
323
324
        return Observation(**kwargs)
325
326
    def get_observations(
327
        self, obs_id=None, skip_missing=False, required_irf="full-enclosure"
328
    ):
329
        """Generate a `~gammapy.data.Observations`.
330
331
        Parameters
332
        ----------
333
        obs_id : list
334
            Observation IDs (default of ``None`` means "all")
335
            If not given, all observations ordered by OBS_ID are returned.
336
            This is not necessarily the order in the ``obs_table``.
337
        skip_missing : bool, optional
338
            Skip missing observations, default: False
339
        required_irf : list of str or str
340
            Runs will be added to the list of observations only if the
341
            required HDUs are present. Otherwise, the given run will be skipped
342
            The list can include the following options:
343
344
            * `"events"` : Events
345
            * `"gti"` :  Good time intervals
346
            * `"aeff"` : Effective area
347
            * `"bkg"` : Background
348
            * `"edisp"`: Energy dispersion
349
            * `"psf"` : Point Spread Function
350
            * `"rad_max"` : Maximal radius
351
352
            Alternatively single string can be used as shortcut:
353
354
            * `"full-enclosure"` : includes `["events", "gti", "aeff", "edisp", "psf", "bkg"]`
355
            * `"point-like"` : includes `["events", "gti", "aeff", "edisp"]`
356
            * `"all-optional"` : no HDUs are required, only warnings will be emitted
357
              for missing HDUs among all possibilities.
358
359
        Returns
360
        -------
361
        observations : `~gammapy.data.Observations`
362
            Container holding a list of `~gammapy.data.Observation`
363
        """
364
365
        if obs_id is None:
366
            obs_id = self.obs_ids
367
368
        obs_list = []
369
370
        for _ in obs_id:
371
            try:
372
                obs = self.obs(_, required_irf)
373
            except ValueError as err:
374
                if skip_missing:
375
                    log.warning(f"Skipping missing obs_id: {_!r}")
376
                    continue
377
                else:
378
                    raise err
379
            except MissingRequiredHDU as e:
380
                log.warning(f"Skipping run with missing HDUs; {e}")
381
                continue
382
383
            obs_list.append(obs)
384
385
        log.info(f"Observations selected: {len(obs_list)} out of {len(obs_id)}.")
386
        return Observations(obs_list)
387
388
    def copy_obs(self, obs_id, outdir, hdu_class=None, verbose=False, overwrite=False):
389
        """Create a new `~gammapy.data.DataStore` containing a subset of observations.
390
391
        Parameters
392
        ----------
393
        obs_id : array-like, `~gammapy.data.ObservationTable`
394
            List of observations to copy
395
        outdir : str, Path
396
            Directory for the new store
397
        hdu_class : list of str
398
            see :attr:`gammapy.data.HDUIndexTable.VALID_HDU_CLASS`
399
        verbose : bool
400
            Print copied files
401
        overwrite : bool
402
            Overwrite
403
        """
404
        outdir = make_path(outdir)
405
406
        if not outdir.is_dir():
407
            raise OSError(f"Not a directory: outdir={outdir}")
408
409
        if isinstance(obs_id, ObservationTable):
410
            obs_id = obs_id["OBS_ID"].data
411
412
        hdutable = self.hdu_table
413
        hdutable.add_index("OBS_ID")
414
        with hdutable.index_mode("discard_on_copy"):
415
            subhdutable = hdutable.loc[obs_id]
416
        if hdu_class is not None:
417
            subhdutable.add_index("HDU_CLASS")
418
            with subhdutable.index_mode("discard_on_copy"):
419
                subhdutable = subhdutable.loc[hdu_class]
420
        if self.obs_table:
421
            subobstable = self.obs_table.select_obs_id(obs_id)
422
423
        for idx in range(len(subhdutable)):
424
            # Changes to the file structure could be made here
425
            loc = subhdutable.location_info(idx)
426
            targetdir = outdir / loc.file_dir
427
            targetdir.mkdir(exist_ok=True, parents=True)
428
            cmd = ["cp"]
429
            if verbose:
430
                cmd += ["-v"]
431
            if not overwrite:
432
                cmd += ["-n"]
433
            cmd += [str(loc.path()), str(targetdir)]
434
            subprocess.run(cmd)
435
436
        filename = outdir / self.DEFAULT_HDU_TABLE
437
        subhdutable.write(filename, format="fits", overwrite=overwrite)
438
439
        if self.obs_table:
440
            filename = outdir / self.DEFAULT_OBS_TABLE
441
            subobstable.write(str(filename), format="fits", overwrite=overwrite)
0 ignored issues
show
introduced by
The variable subobstable does not seem to be defined in case self.obs_table on line 420 is False. Are you sure this can never be the case?
Loading history...
442
443
    def check(self, checks="all"):
444
        """Check index tables and data files.
445
446
        This is a generator that yields a list of dicts.
447
        """
448
        checker = DataStoreChecker(self)
449
        return checker.run(checks=checks)
450
451
452
class DataStoreChecker(Checker):
453
    """Check data store.
454
455
    Checks data format and a bit about the content.
456
    """
457
458
    CHECKS = {
459
        "obs_table": "check_obs_table",
460
        "hdu_table": "check_hdu_table",
461
        "observations": "check_observations",
462
        "consistency": "check_consistency",
463
    }
464
465
    def __init__(self, data_store):
466
        self.data_store = data_store
467
468
    def check_obs_table(self):
469
        """Checks for the observation index table."""
470
        yield from ObservationTableChecker(self.data_store.obs_table).run()
471
472
    def check_hdu_table(self):
473
        """Checks for the HDU index table."""
474
        t = self.data_store.hdu_table
475
        m = t.meta
476
        if m.get("HDUCLAS1", "") != "INDEX":
477
            yield {
478
                "level": "error",
479
                "hdu": "hdu-index",
480
                "msg": "Invalid header key. Must have HDUCLAS1=INDEX",
481
            }
482
        if m.get("HDUCLAS2", "") != "HDU":
483
            yield {
484
                "level": "error",
485
                "hdu": "hdu-index",
486
                "msg": "Invalid header key. Must have HDUCLAS2=HDU",
487
            }
488
489
        # Check that all HDU in the data files exist
490
        for idx in range(len(t)):
491
            location_info = t.location_info(idx)
492
            try:
493
                location_info.get_hdu()
494
            except KeyError:
495
                yield {
496
                    "level": "error",
497
                    "msg": f"HDU not found: {location_info.__dict__!r}",
498
                }
499
500
    def check_consistency(self):
501
        """Check consistency between multiple HDUs."""
502
        # obs and HDU index should have the same OBS_ID
503
        obs_table_obs_id = set(self.data_store.obs_table["OBS_ID"])
504
        hdu_table_obs_id = set(self.data_store.hdu_table["OBS_ID"])
505
        if not obs_table_obs_id == hdu_table_obs_id:
506
            yield {
507
                "level": "error",
508
                "msg": "Inconsistent OBS_ID in obs and HDU index tables",
509
            }
510
        # TODO: obs table and events header should have the same times
511
512
    def check_observations(self):
513
        """Perform some sanity checks for all observations."""
514
        for obs_id in self.data_store.obs_table["OBS_ID"]:
515
            obs = self.data_store.obs(obs_id)
516
            yield from ObservationChecker(obs).run()
517
518
519
class DataStoreMaker:
520
    """Create data store index tables.
521
522
    This is a multi-step process coded as a class.
523
    Users will usually call this via `DataStore.from_events_files`.
524
    """
525
526
    def __init__(self, events_paths, irfs_paths=None):
527
        if isinstance(events_paths, (str, Path)):
528
            raise TypeError("Need list of paths, not a single string or Path object.")
529
530
        self.events_paths = [make_path(path) for path in events_paths]
531
        if irfs_paths is None or isinstance(irfs_paths, (str, Path)):
532
            self.irfs_paths = [make_path(irfs_paths)] * len(events_paths)
533
        else:
534
            self.irfs_paths = [make_path(path) for path in irfs_paths]
535
536
        # Cache for EVENTS file header information, to avoid multiple reads
537
        self._events_info = {}
538
539
    def run(self):
540
        hdu_table = self.make_hdu_table()
541
        obs_table = self.make_obs_table()
542
        return DataStore(hdu_table=hdu_table, obs_table=obs_table)
543
544
    def get_events_info(self, events_path, irf_path=None):
545
        if events_path not in self._events_info:
546
            self._events_info[events_path] = self.read_events_info(
547
                events_path, irf_path
548
            )
549
550
        return self._events_info[events_path]
551
552
    def get_obs_info(self, events_path, irf_path=None):
553
        # We could add or remove info here depending on what we want in the obs table
554
        return self.get_events_info(events_path, irf_path)
555
556
    @staticmethod
557
    def read_events_info(events_path, irf_path=None):
558
        """Read mandatory events header info"""
559
        log.debug(f"Reading {events_path}")
560
561
        with fits.open(events_path, memmap=False) as hdu_list:
562
            header = hdu_list["EVENTS"].header
563
564
        na_int, na_str = -1, "NOT AVAILABLE"
565
566
        info = {}
567
        # Note: for some reason `header["OBS_ID"]` is sometimes `str`, maybe trailing whitespace
568
        # mandatory header info:
569
        info["OBS_ID"] = int(header["OBS_ID"])
570
        info["TSTART"] = header["TSTART"] * u.s
571
        info["TSTOP"] = header["TSTOP"] * u.s
572
        info["ONTIME"] = header["ONTIME"] * u.s
573
        info["LIVETIME"] = header["LIVETIME"] * u.s
574
        info["DEADC"] = header["DEADC"]
575
        info["TELESCOP"] = header.get("TELESCOP", na_str)
576
577
        obs_mode = header.get("OBS_MODE", "POINTING")
578
        if obs_mode == "DRIFT":
579
            info["ALT_PNT"] = header["ALT_PNT"] * u.deg
580
            info["AZ_PNT"] = header["AZ_PNT"] * u.deg
581
            info["ZEN_PNT"] = 90 * u.deg - info["ALT_PNT"]
582
        else:
583
            info["RA_PNT"] = header["RA_PNT"] * u.deg
584
            info["DEC_PNT"] = header["DEC_PNT"] * u.deg
585
586
        # optional header info
587
        pos = SkyCoord(info["RA_PNT"], info["DEC_PNT"], unit="deg").galactic
588
        info["GLON_PNT"] = pos.l
589
        info["GLAT_PNT"] = pos.b
590
        info["DATE-OBS"] = header.get("DATE_OBS", na_str)
591
        info["TIME-OBS"] = header.get("TIME_OBS", na_str)
592
        info["DATE-END"] = header.get("DATE_END", na_str)
593
        info["TIME-END"] = header.get("TIME_END", na_str)
594
        info["N_TELS"] = header.get("N_TELS", na_int)
595
        info["OBJECT"] = header.get("OBJECT", na_str)
596
597
        # Not part of the spec, but good to know from which file the info comes
598
        info["EVENTS_FILENAME"] = str(events_path)
599
        info["EVENT_COUNT"] = header["NAXIS2"]
600
601
        # This is the info needed to link from EVENTS to IRFs
602
        info["CALDB"] = header.get("CALDB", na_str)
603
        info["IRF"] = header.get("IRF", na_str)
604
        if irf_path is not None:
605
            info["IRF_FILENAME"] = str(irf_path)
606
        elif info["CALDB"] != na_str and info["IRF"] != na_str:
607
            caldb_irf = CalDBIRF.from_meta(info)
608
            info["IRF_FILENAME"] = str(caldb_irf.file_path)
609
        else:
610
            info["IRF_FILENAME"] = info["EVENTS_FILENAME"]
611
        return info
612
613
    def make_obs_table(self):
614
        rows = []
615
        for events_path, irf_path in zip(self.events_paths, self.irfs_paths):
616
            row = self.get_obs_info(events_path, irf_path)
617
            rows.append(row)
618
619
        names = list(rows[0].keys())
620
        table = ObservationTable(rows=rows, names=names)
621
622
        # TODO: Values copied from one of the EVENTS headers
623
        # TODO: check consistency for all EVENTS files and handle inconsistent case
624
        # Transform times to first ref time? Or raise error for now?
625
        # Test by combining some HESS & CTA runs?
626
        m = table.meta
627
        m["MJDREFI"] = 51544
628
        m["MJDREFF"] = 5.0000000000e-01
629
        m["TIMEUNIT"] = "s"
630
        m["TIMESYS"] = "TT"
631
        m["TIMEREF"] = "LOCAL"
632
633
        m["HDUCLASS"] = "GADF"
634
        m["HDUDOC"] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"
635
        m["HDUVERS"] = "0.2"
636
        m["HDUCLAS1"] = "INDEX"
637
        m["HDUCLAS2"] = "OBS"
638
639
        return table
640
641
    def make_hdu_table(self):
642
        rows = []
643
        for events_path, irf_path in zip(self.events_paths, self.irfs_paths):
644
            rows.extend(self.get_hdu_table_rows(events_path, irf_path))
645
646
        names = list(rows[0].keys())
647
        # names = ['OBS_ID', 'HDU_TYPE', 'HDU_CLASS', 'FILE_DIR', 'FILE_NAME', 'HDU_NAME']
648
649
        table = HDUIndexTable(rows=rows, names=names)
650
651
        m = table.meta
652
        m["HDUCLASS"] = "GADF"
653
        m["HDUDOC"] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats"
654
        m["HDUVERS"] = "0.2"
655
        m["HDUCLAS1"] = "INDEX"
656
        m["HDUCLAS2"] = "HDU"
657
658
        return table
659
660
    def get_hdu_table_rows(self, events_path, irf_path=None):
661
        events_info = self.get_obs_info(events_path, irf_path)
662
663
        info = dict(
664
            OBS_ID=events_info["OBS_ID"],
665
            FILE_DIR=events_path.parent.as_posix(),
666
            FILE_NAME=events_path.name,
667
        )
668
        yield dict(HDU_TYPE="events", HDU_CLASS="events", HDU_NAME="EVENTS", **info)
669
        yield dict(HDU_TYPE="gti", HDU_CLASS="gti", HDU_NAME="GTI", **info)
670
671
        irf_path = Path(events_info["IRF_FILENAME"])
672
        info = dict(
673
            OBS_ID=events_info["OBS_ID"],
674
            FILE_DIR=irf_path.parent.as_posix(),
675
            FILE_NAME=irf_path.name,
676
        )
677
        yield dict(
678
            HDU_TYPE="aeff", HDU_CLASS="aeff_2d", HDU_NAME="EFFECTIVE AREA", **info
679
        )
680
        yield dict(
681
            HDU_TYPE="edisp", HDU_CLASS="edisp_2d", HDU_NAME="ENERGY DISPERSION", **info
682
        )
683
        yield dict(
684
            HDU_TYPE="psf",
685
            HDU_CLASS="psf_3gauss",
686
            HDU_NAME="POINT SPREAD FUNCTION",
687
            **info,
688
        )
689
        yield dict(HDU_TYPE="bkg", HDU_CLASS="bkg_3d", HDU_NAME="BACKGROUND", **info)
690
691
692
# TODO: load IRF file, and infer HDU_CLASS from IRF file contents!
693
class CalDBIRF:
694
    """Helper class to work with IRFs in CALDB format."""
695
696
    def __init__(self, telescop, caldb, irf):
697
        self.telescop = telescop
698
        self.caldb = caldb
699
        self.irf = irf
700
701
    @classmethod
702
    def from_meta(cls, meta):
703
        return cls(telescop=meta["TELESCOP"], caldb=meta["CALDB"], irf=meta["IRF"])
704
705
    @property
706
    def file_dir(self):
707
        # In CTA 1DC the header key is "CTA", but the directory is lower-case "cta"
708
        telescop = self.telescop.lower()
709
        return f"$CALDB/data/{telescop}/{self.caldb}/bcf/{self.irf}"
710
711
    @property
712
    def file_path(self):
713
        return Path(f"{self.file_dir}/{self.file_name}")
714
715
    @property
716
    def file_name(self):
717
        path = make_path(self.file_dir)
718
        return list(path.iterdir())[0].name
719