Passed
Pull Request — main (#204)
by Chaitanya
01:26
created

AsgardpyAnalysis.run_dl3_dl4_steps()   A

Complexity

Conditions 4

Size

Total Lines 27
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 3
dl 0
loc 27
rs 9.6
c 0
b 0
f 0
1
"""
2
Config-driven high level analysis interface.
3
"""
4
5
import logging
6
7
from gammapy.datasets import Datasets
8
from gammapy.modeling.models import CompoundSpectralModel, Models, SkyModel
9
from pydantic import ValidationError
10
11
from asgardpy.analysis.step import AnalysisStep
12
from asgardpy.config import AsgardpyConfig, gammapy_model_to_asgardpy_model_config
13
from asgardpy.data.target import set_models
14
from asgardpy.stats.stats import get_goodness_of_fit_stats
15
16
log = logging.getLogger(__name__)
17
18
__all__ = ["AsgardpyAnalysis"]
19
20
21
class AsgardpyAnalysis:
22
    """
23
    Config-driven high level analysis interface.
24
25
    It is initialized by default with a set of configuration parameters and
26
    values declared in an internal high level interface model, though the user
27
    can also provide configuration parameters passed as a nested dictionary at
28
    the moment of instantiation. In that case these parameters will overwrite
29
    the default values of those present in the configuration file.
30
31
    A specific example of an upgrade of the configuration parameters is when
32
    the Target Models information is provided as a path to a separate yaml file,
33
    which is readable with AsgardpyConfig. In this case, the configuration used
34
    in AsgardpyAnalysis is updated in the initialization step itself.
35
36
    Parameters
37
    ----------
38
    config : dict or `AsgardpyConfig`
39
        Configuration options following `AsgardpyConfig` schema
40
    """
41
42
    def __init__(self, config):
43
        self.log = log
44
        self.config = config
45
46
        if self.config.target.models_file.is_file():
47
            try:
48
                other_config = AsgardpyConfig.read(self.config.target.models_file)
49
                self.config = self.config.update(other_config)
50
            except ValidationError:
51
                self.config = gammapy_model_to_asgardpy_model_config(
52
                    gammapy_model=self.config.target.models_file,
53
                    asgardpy_config_file=self.config,
54
                )
55
56
        self.config.set_logging()
57
        self.datasets = Datasets()
58
        self.instrument_spectral_info = {
59
            "name": [],
60
            "spectral_energy_ranges": [],
61
            "en_bins": 0,
62
            "free_params": 0,
63
            "DoF": 0,
64
        }
65
        self.dataset_name_list = []
66
67
        self.final_model = Models()
68
        self.final_data_products = ["fit", "fit_result", "flux_points", "model_deabs", "flux_points_deabs"]
69
70
        for data_product in self.final_data_products:
71
            setattr(self, data_product, None)
72
73
    @property
74
    def models(self):
75
        """
76
        Display the assigned Models.
77
        """
78
        if not self.datasets:
79
            raise RuntimeError("No datasets defined. Impossible to set models.")
80
        return self.datasets.models
81
82
    @property
83
    def config(self):
84
        """
85
        Analysis configuration (`AsgardpyConfig`)
86
        """
87
        return self._config
88
89
    @config.setter
90
    def config(self, value):
91
        if isinstance(value, dict):
92
            self._config = AsgardpyConfig(**value)
93
        elif isinstance(value, AsgardpyConfig):
94
            self._config = value
95
        else:
96
            raise TypeError("config must be dict or AsgardpyConfig.")
97
98
    def update_models_list(self, models_list):
99
        """
100
        This function updates the final_model object with a given list of
101
        models, only if the target source has a Spatial model included.
102
103
        This step is only valid for 3D Datasets, and might need reconsideration
104
        in the future.
105
        """
106
        if models_list:
107
            target_source_model = models_list[self.config.target.source_name]
108
109
            if target_source_model.spatial_model:
110
                for model_ in models_list:
111
                    self.final_model.append(model_)
112
            else:  # pragma: no cover
113
                self.log.info(
114
                    "The target source %s only has spectral model",
115
                    self.config.target.source_name,
116
                )
117
118
    def get_correct_intrinsic_model(self):
119
        """
120
        After running the Fit analysis step, one can retrieve the intrinsic or
121
        EBL-deabsorbed spectra. This function is just an accessory to the
122
        feature in Gammapy v1.3.
123
        """
124
        # Assuming EBL model is present
125
        if isinstance(self.final_model[0].spectral_model, CompoundSpectralModel):
126
            temp_model = self.final_model[0].spectral_model.model1
127
        else:
128
            temp_model = self.final_model[0].spectral_model
129
130
        self.model_deabs = SkyModel(
131
            spectral_model=temp_model,
132
            name=self.final_model[0].name,
133
            datasets_names=self.final_model[0].datasets_names,
134
        )
135
136
    def get_correct_ebl_deabs_flux_points(self):
137
        """
138
        After running the get_correct_intrinsic_model function, this function
139
        will use the intrinsic model as the reference spectral model to get the
140
        flux points for the intrinsic spectra. This function uses the workaround
141
        as described in Gammapy v1.3.
142
        """
143
        self.flux_points_deabs = []
144
145
        if not self.model_deabs:
146
            self.get_correct_intrinsic_model()
147
148
        for fp in self.flux_points:
149
            self.flux_points_deabs.append(fp.copy(reference_model=self.model_deabs))
150
151
    def add_to_instrument_info(self, info_dict):
152
        """
153
        Update the name, Degrees of Freedom and spectral energy ranges for each
154
        instrument Datasets, to be used for the DL4 to DL5 processes.
155
        """
156
        for name in info_dict["name"]:
157
            self.instrument_spectral_info["name"].append(name)
158
159
        for edges in info_dict["spectral_energy_ranges"]:
160
            self.instrument_spectral_info["spectral_energy_ranges"].append(edges)
161
162
        self.instrument_spectral_info["en_bins"] += info_dict["en_bins"]
163
        self.instrument_spectral_info["free_params"] += info_dict["free_params"]
164
165
    def update_dof_value(self):
166
        """
167
        Simple function to update total Degrees of Freedom value in the
168
        instrument_spectral_info dict from a filled final_model object.
169
        """
170
        if len(self.final_model) > 0:
171
            # Add to the total number of free model parameters
172
            n_free_params = len(list(self.final_model.parameters.free_parameters))
173
            self.instrument_spectral_info["free_params"] += n_free_params
174
175
            # Get the final degrees of freedom as en_bins - free_params
176
            self.instrument_spectral_info["DoF"] = (
177
                self.instrument_spectral_info["en_bins"] - self.instrument_spectral_info["free_params"]
178
            )
179
180
    def run_dl3_dl4_steps(self, dl3_dl4_steps, kwargs):
181
        """
182
        Distinct analysis steps for Dataset reduction from DL3 to DL4 data products.
183
        """
184
        for step in dl3_dl4_steps:
185
            analysis_step = AnalysisStep.create(step, self.config, **kwargs)
186
187
            datasets_list, models_list, instrument_spectral_info = analysis_step.run()
188
189
            self.update_models_list(models_list)
190
191
            # To get all datasets_names from the datasets and update the final datasets list
192
            for data in datasets_list:
193
                if data.name not in self.dataset_name_list:
194
                    self.dataset_name_list.append(data.name)
195
                self.datasets.append(data)
196
197
            self.add_to_instrument_info(instrument_spectral_info)
198
199
        self.datasets, self.final_model = set_models(
200
            self.config.target,
201
            self.datasets,
202
            self.dataset_name_list,
203
            models=self.final_model,
204
        )
205
206
        self.update_dof_value()
207
208
    def run_dl4_dl5_steps(self, dl4_dl5_steps, kwargs):
209
        """
210
        Distinct analysis steps for Joint-Likelihood fitting and transforming
211
        DL4 to DL5 data products.
212
        """
213
        for step in dl4_dl5_steps:
214
            analysis_step = AnalysisStep.create(step, self.config, **kwargs)
215
216
            analysis_step.run(datasets=self.datasets, instrument_spectral_info=self.instrument_spectral_info)
217
218
            # Update the final data product objects
219
            for data_product in self.final_data_products:
220
                if hasattr(analysis_step, data_product):
221
                    setattr(self, data_product, getattr(analysis_step, data_product))
222
223
    def run(self, steps=None, **kwargs):
224
        """
225
        Main function to run the AnalaysisSteps provided.
226
227
        Currently overwrite option is used from the value in AsgardpyConfig,
228
        and is True by default.
229
        """
230
        if steps is None:
231
            steps = self.config.general.steps
232
            self.overwrite = self.config.general.overwrite
233
234
        dl3_dl4_steps = [step for step in steps if "datasets" in step]
235
        dl4_dl5_steps = [step for step in steps if "datasets" not in step]
236
237
        if len(dl3_dl4_steps) > 0:
238
            self.log.info("Perform DL3 to DL4 process!")
239
            self.run_dl3_dl4_steps(dl3_dl4_steps, kwargs)
240
            self.log.info("Models have been associated with the Datasets")
241
242
        if len(dl4_dl5_steps) > 0:
243
            self.log.info("Perform DL4 to DL5 processes!")
244
            self.run_dl4_dl5_steps(dl4_dl5_steps, kwargs)
245
246
        if self.fit_result:
247
            self.instrument_spectral_info, message = get_goodness_of_fit_stats(
248
                self.datasets, self.instrument_spectral_info
249
            )
250
            self.log.info(message)
251
252
    # keep these methods to be backward compatible
253
    def get_1d_datasets(self):
254
        """Produce stacked 1D datasets."""
255
        self.run(steps=["datasets-1d"])
256
257
    def get_3d_datasets(self):
258
        """Produce stacked 3D datasets."""
259
        self.run(steps=["datasets-3d"])
260
261
    def run_fit(self):
262
        """Fitting reduced datasets to model."""
263
        self.run(steps=["fit"])
264
265
    def get_flux_points(self):
266
        """Calculate flux points for a specific model component."""
267
        self.run(steps=["flux-points"])
268
269
    def update_config(self, config):
270
        """Update the primary config with another config."""
271
        self.config = self.config.update(config=config)
272