Passed
Pull Request — main (#200)
by Chaitanya
01:36
created

asgardpy.data.dataset_1d.Dataset1DGeneration.run()   A

Complexity

Conditions 1

Size

Total Lines 69
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 36
nop 1
dl 0
loc 69
rs 9.016
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Main classes to define 1D Dataset Config, 1D Dataset Analysis Step and
3
to generate 1D Datasets from given Instruments' DL3 data from the config.
4
"""
5
6
import logging
7
8
import numpy as np
9
from astropy import units as u
10
from gammapy.datasets import Datasets
11
12
from asgardpy.analysis.step_base import AnalysisStepBase
13
from asgardpy.base.base import BaseConfig
14
from asgardpy.base.geom import (
15
    GeomConfig,
16
    SkyPositionConfig,
17
    generate_geom,
18
    get_source_position,
19
)
20
from asgardpy.base.reduction import (
21
    BackgroundConfig,
22
    MapSelectionEnum,
23
    ObservationsConfig,
24
    ReductionTypeEnum,
25
    SafeMaskConfig,
26
    generate_dl4_dataset,
27
    get_bkg_maker,
28
    get_dataset_maker,
29
    get_dataset_reference,
30
    get_exclusion_region_mask,
31
    get_filtered_observations,
32
    get_safe_mask_maker,
33
)
34
from asgardpy.io.input_dl3 import InputDL3Config  # DL3Files
35
from asgardpy.io.io_dl4 import DL4BaseConfig, DL4Files, get_reco_energy_bins
36
from asgardpy.version import __public_version__
37
38
__all__ = [
39
    "Datasets1DAnalysisStep",
40
    "Dataset1DBaseConfig",
41
    "Dataset1DConfig",
42
    "Dataset1DGeneration",
43
    "Dataset1DInfoConfig",
44
]
45
46
log = logging.getLogger(__name__)
47
48
49
# Defining various components of 1D Dataset Config section
50
class Dataset1DInfoConfig(BaseConfig):
51
    """Config section for 1D DL3 Dataset Reduction for each instrument."""
52
53
    name: str = "dataset-name"
54
    geom: GeomConfig = GeomConfig()
55
    observation: ObservationsConfig = ObservationsConfig()
56
    background: BackgroundConfig = BackgroundConfig()
57
    safe_mask: SafeMaskConfig = SafeMaskConfig()
58
    on_region: SkyPositionConfig = SkyPositionConfig()
59
    containment_correction: bool = True
60
    map_selection: list[MapSelectionEnum] = []
61
62
63
class Dataset1DBaseConfig(BaseConfig):
64
    """
65
    Config section for 1D DL3 Dataset base information for each instrument.
66
    """
67
68
    name: str = "Instrument-name"
69
    input_dl3: list[InputDL3Config] = [InputDL3Config()]
70
    input_dl4: bool = False
71
    dataset_info: Dataset1DInfoConfig = Dataset1DInfoConfig()
72
    dl4_dataset_info: DL4BaseConfig = DL4BaseConfig()
73
74
75
class Dataset1DConfig(BaseConfig):
76
    """Config section for a list of all 1D DL3 Datasets information."""
77
78
    type: ReductionTypeEnum = ReductionTypeEnum.spectrum
79
    instruments: list[Dataset1DBaseConfig] = [Dataset1DBaseConfig()]
80
81
82
# The main Analysis Step
83
class Datasets1DAnalysisStep(AnalysisStepBase):
84
    """
85
    From the given config information, prepare the full list of 1D datasets,
86
    iterating over all the Instruments' information by running the
87
    Dataset1DGeneration function.
88
    """
89
90
    tag = "datasets-1d"
91
92
    def _run(self):
93
        instruments_list = self.config.dataset1d.instruments
94
        self.log.info("%d number of 1D Datasets given", len(instruments_list))
95
96
        datasets_1d_final = Datasets()
97
        instrument_spectral_info = {"name": [], "spectral_energy_ranges": []}
98
99
        # Calculate the total number of reconstructed energy bins used
100
        en_bins = 0
101
102
        # Iterate over all instrument information given:
103
        for i in np.arange(len(instruments_list)):
104
            config_1d_dataset = instruments_list[i]
105
            instrument_spectral_info["name"].append(config_1d_dataset.name)
106
            dl4_files = DL4Files(config_1d_dataset.dl4_dataset_info, self.log)
107
108
            if not config_1d_dataset.input_dl4:
109
                generate_1d_dataset = Dataset1DGeneration(self.log, config_1d_dataset, self.config)
110
                dataset = generate_1d_dataset.run()
111
            else:
112
                dataset = dl4_files.get_dl4_dataset(config_1d_dataset.dataset_info.observation)
113
114
            energy_bin_edges = dl4_files.get_spectral_energies()
115
            instrument_spectral_info["spectral_energy_ranges"].append(energy_bin_edges)
116
117
            if self.config.general.stacked_dataset:
118
                dataset = dataset.stack_reduce(name=config_1d_dataset.name)
119
                dataset._meta.optional = {
120
                    "instrument": config_1d_dataset.name,
121
                }
122
                dataset._meta.creation.creator += f", Asgardpy {__public_version__}"
123
124
                en_bins = get_reco_energy_bins(dataset, en_bins)
125
                datasets_1d_final.append(dataset)
126
            else:
127
                for data in dataset:
128
                    data._meta.optional = {
129
                        "instrument": config_1d_dataset.name,
130
                    }
131
                    data._meta.creation.creator += f", Asgardpy {__public_version__}"
132
                    en_bins = get_reco_energy_bins(data, en_bins)
133
                    datasets_1d_final.append(data)
134
135
        instrument_spectral_info["en_bins"] = en_bins
136
137
        # No linked model parameters or other free model parameters taken here
138
        instrument_spectral_info["free_params"] = 0
139
140
        return (
141
            datasets_1d_final,
142
            None,
143
            instrument_spectral_info,
144
        )
145
146
147
class Dataset1DGeneration:
148
    """
149
    Class for 1D dataset creation based on the config or AsgardpyConfig
150
    information provided on the 1D dataset and the target source.
151
152
    Runs the following steps:
153
154
    1. Read the DL3 files of 1D datasets into DataStore object.
155
156
    2. Perform any Observation selection, based on Observation IDs or time intervals.
157
158
    3. Create the base dataset reference, including the main counts geometry.
159
160
    4. Prepare standard data reduction makers using the parameters passed in the config.
161
162
    5. Generate the final dataset.
163
    """
164
165
    def __init__(self, log, config_1d_dataset, config_full):
166
        self.config_1d_dataset_io = config_1d_dataset.input_dl3
167
        self.log = log
168
        self.config_1d_dataset_info = config_1d_dataset.dataset_info
169
        self.config_target = config_full.target
170
        self.n_jobs = config_full.general.n_jobs
171
        self.parallel_backend = config_full.general.parallel_backend
172
        self.exclusion_regions = []
173
        self.datasets = Datasets()
174
175
    def run(self):
176
        """
177
        Main function to run the creation of 1D dataset.
178
        """
179
        # First check for the given file list if they are readable or not.
180
        # dl3_info = DL3Files(
181
        #     self.config_1d_dataset_io[0],
182
        #     log=self.log,
183
        # )
184
        # dl3_info.list_dl3_files()
185
186
        # if len(dl3_info.events_files) == 0:
187
        #     self.log.error("No DL3 files found at %s", dl3_info.dl3_path)
188
189
        # Applying all provided filters to get the Observations object
190
        observations = get_filtered_observations(
191
            dl3_path=self.config_1d_dataset_io[0].input_dir,
192
            obs_config=self.config_1d_dataset_info.observation,
193
            log=self.log,
194
        )
195
        # Get dict information of the ON region, with its SkyCoord position and angular radius
196
        center_pos = get_source_position(target_region=self.config_1d_dataset_info.on_region)
197
198
        # Create the main counts geometry
199
        geom = generate_geom(tag="1d", geom_config=self.config_1d_dataset_info.geom, center_pos=center_pos)
200
201
        # Get all the Dataset reduction makers
202
        dataset_reference = get_dataset_reference(
203
            tag="1d", geom=geom, geom_config=self.config_1d_dataset_info.geom
204
        )
205
206
        dataset_maker = get_dataset_maker(
207
            tag="1d",
208
            dataset_config=self.config_1d_dataset_info,
209
        )
210
211
        safe_maker = get_safe_mask_maker(safe_config=self.config_1d_dataset_info.safe_mask)
212
213
        excluded_geom = generate_geom(
214
            tag="1d-ex", geom_config=self.config_1d_dataset_info.geom, center_pos=center_pos
215
        )
216
        exclusion_mask = get_exclusion_region_mask(
217
            exclusion_params=self.config_1d_dataset_info.background.exclusion,
218
            exclusion_regions=self.exclusion_regions,
219
            excluded_geom=excluded_geom,
220
            config_target=self.config_target,
221
            geom_config=self.config_1d_dataset_info.geom,
222
            log=self.log,
223
        )
224
225
        bkg_maker = get_bkg_maker(
226
            bkg_config=self.config_1d_dataset_info.background,
227
            exclusion_mask=exclusion_mask,
228
        )
229
230
        # Produce the final Dataset
231
        self.datasets = generate_dl4_dataset(
232
            tag="1d",
233
            observations=observations,
234
            dataset_reference=dataset_reference,
235
            dataset_maker=dataset_maker,
236
            bkg_maker=bkg_maker,
237
            safe_maker=safe_maker,
238
            n_jobs=self.n_jobs,
239
            parallel_backend=self.parallel_backend,
240
        )
241
        self.update_dataset(observations)
242
243
        return self.datasets
244
245
    def update_dataset(self, observations):
246
        """
247
        Update the datasets generated by DatasetsMaker with names as per the
248
        Observation ID and if a custom safe energy mask is provided in the
249
        config, apply it to each dataset accordingly.
250
        """
251
        safe_cfg = self.config_1d_dataset_info.safe_mask
252
        pars = safe_cfg.parameters
253
254
        for data, obs in zip(self.datasets, observations, strict=True):
255
            # Rename the datasets using the appropriate Obs ID
256
            data._name = str(obs.obs_id)
257
258
            # Use custom safe energy mask
259
            if "custom-mask" in safe_cfg.methods:
260
                data.mask_safe = data.counts.geom.energy_mask(
261
                    energy_min=u.Quantity(pars["min"]), energy_max=u.Quantity(pars["max"]), round_to_edge=True
262
                )
263