Completed
Push — main ( 71efba...8caadb )
by Chaitanya
39s queued 34s
created

AsgardpyAnalysis.__init__()   A

Complexity

Conditions 4

Size

Total Lines 30
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 24
nop 2
dl 0
loc 30
rs 9.304
c 0
b 0
f 0
1
"""
2
Config-driven high level analysis interface.
3
"""
4
5
import logging
6
7
from gammapy.datasets import Datasets
8
from gammapy.modeling.models import Models, SkyModel
9
from pydantic import ValidationError
10
11
from asgardpy.analysis.step import AnalysisStep
12
from asgardpy.config.generator import AsgardpyConfig, gammapy_to_asgardpy_model_config
13
from asgardpy.data.target import set_models
14
from asgardpy.stats.stats import get_goodness_of_fit_stats
15
16
log = logging.getLogger(__name__)
17
18
__all__ = ["AsgardpyAnalysis"]
19
20
21
class AsgardpyAnalysis:
22
    """
23
    Config-driven high level analysis interface.
24
25
    It is initialized by default with a set of configuration parameters and
26
    values declared in an internal high level interface model, though the user
27
    can also provide configuration parameters passed as a nested dictionary at
28
    the moment of instantiation. In that case these parameters will overwrite
29
    the default values of those present in the configuration file.
30
31
    A specific example of an upgrade of the configuration parameters is when
32
    the Target Models information is provided as a path to a separate yaml file,
33
    which is readable with AsgardpyConfig. In this case, the configuration used
34
    in AsgardpyAnalysis is updated in the initialization step itself.
35
36
    Parameters
37
    ----------
38
    config : dict or `AsgardpyConfig`
39
        Configuration options following `AsgardpyConfig` schema
40
    """
41
42
    def __init__(self, config):
43
        self.log = log
44
        self.config = config
45
46
        if self.config.target.models_file.is_file():
47
            try:
48
                other_config = AsgardpyConfig.read(self.config.target.models_file)
49
                self.config = self.config.update(other_config)
50
            except ValidationError:
51
                self.config = gammapy_to_asgardpy_model_config(
52
                    gammapy_model=self.config.target.models_file,
53
                    asgardpy_config_file=self.config,
54
                )
55
56
        self.config.set_logging()
57
        self.datasets = Datasets()
58
        self.instrument_spectral_info = {
59
            "name": [],
60
            "spectral_energy_ranges": [],
61
            "en_bins": 0,
62
            "free_params": 0,
63
            "DoF": 0,
64
        }
65
        self.dataset_name_list = []
66
67
        self.final_model = Models()
68
        self.final_data_products = ["fit", "fit_result", "flux_points", "model_deabs", "flux_points_deabs"]
69
70
        for data_product in self.final_data_products:
71
            setattr(self, data_product, None)
72
73
    @property
74
    def models(self):
75
        """
76
        Display the assigned Models.
77
        """
78
        if not self.datasets:
79
            raise RuntimeError("No datasets defined. Impossible to set models.")
80
        return self.datasets.models
81
82
    @property
83
    def config(self):
84
        """
85
        Analysis configuration (`AsgardpyConfig`)
86
        """
87
        return self._config
88
89
    @config.setter
90
    def config(self, value):
91
        if isinstance(value, dict):
92
            self._config = AsgardpyConfig(**value)
93
        elif isinstance(value, AsgardpyConfig):
94
            self._config = value
95
        else:
96
            raise TypeError("config must be dict or AsgardpyConfig.")
97
98
    def update_models_list(self, models_list):
99
        """
100
        This function updates the final_model object with a given list of
101
        models, only if the target source has a Spatial model included.
102
103
        This step is only valid for 3D Datasets, and might need reconsideration
104
        in the future.
105
        """
106
        if models_list:
107
            target_source_model = models_list[self.config.target.source_name]
108
109
            if target_source_model.spatial_model:
110
                for model_ in models_list:
111
                    self.final_model.append(model_)
112
            else:  # pragma: no cover
113
                self.log.info(
114
                    "The target source %s only has spectral model",
115
                    self.config.target.source_name,
116
                )
117
118
    def get_correct_intrinsic_model(self):
119
        """
120
        After running the Fit analysis step, one can retrieve the correct
121
        covariance matrix for only the intrinsic or EBL-deabsorbed spectra.
122
123
        This function will be redundant in Gammapy v1.3
124
        """
125
        temp_model = self.final_model[0].spectral_model.model1
126
127
        cov_matrix = self.fit.covariance(
128
            datasets=self.datasets, optimize_result=self.fit_result.optimize_result
129
        ).matrix
130
131
        temp_model.covariance = cov_matrix[: len(temp_model.parameters), : len(temp_model.parameters)]
132
133
        self.model_deabs = SkyModel(
134
            spectral_model=temp_model,
135
            name=self.final_model[0].name,
136
            datasets_names=self.final_model[0].datasets_names,
137
        )
138
139
    def get_correct_ebl_deabs_flux_points(self):
140
        """
141
        After running the get_correct_intrinsic_model function, this function
142
        will re-run the flux-points analysis step again, by using only the
143
        intrinsic model as the reference spectral model.
144
145
        This function will be redundant in Gammapy v1.3
146
        """
147
        temp_fp = self.flux_points
148
        self.flux_points = None
149
150
        if not self.model_deabs:
151
            self.get_correct_intrinsic_model()
152
153
        self.run(["flux-points"])
154
155
        for fp in self.flux_points:
156
            fp._models = self.model_deabs
157
            fp._reference_model = self.model_deabs
158
159
        self.flux_points_deabs = self.flux_points
160
        self.flux_points = temp_fp
161
162
    def add_to_instrument_info(self, info_dict):
163
        """
164
        Update the name, Degrees of Freedom and spectral energy ranges for each
165
        instrument Datasets, to be used for the DL4 to DL5 processes.
166
        """
167
        for name in info_dict["name"]:
168
            self.instrument_spectral_info["name"].append(name)
169
170
        for edges in info_dict["spectral_energy_ranges"]:
171
            self.instrument_spectral_info["spectral_energy_ranges"].append(edges)
172
173
        self.instrument_spectral_info["en_bins"] += info_dict["en_bins"]
174
        self.instrument_spectral_info["free_params"] += info_dict["free_params"]
175
176
    def update_dof_value(self):
177
        """
178
        Simple function to update total Degrees of Freedom value in the
179
        instrument_spectral_info dict from a filled final_model object.
180
        """
181
        if len(self.final_model) > 0:
182
            # Add to the total number of free model parameters
183
            n_free_params = len(list(self.final_model.parameters.free_parameters))
184
            self.instrument_spectral_info["free_params"] += n_free_params
185
186
            # Get the final degrees of freedom as en_bins - free_params
187
            self.instrument_spectral_info["DoF"] = (
188
                self.instrument_spectral_info["en_bins"] - self.instrument_spectral_info["free_params"]
189
            )
190
191
    def run(self, steps=None, **kwargs):
192
        """
193
        Main function to run the AnalaysisSteps provided.
194
195
        Currently overwrite option is used from the value in AsgardpyConfig,
196
        and is True by default.
197
        """
198
        if steps is None:
199
            steps = self.config.general.steps
200
            self.overwrite = self.config.general.overwrite
201
202
        dl3_dl4_steps = [step for step in steps if "datasets" in step]
203
        dl4_dl5_steps = [step for step in steps if "datasets" not in step]
204
205
        if len(dl3_dl4_steps) > 0:
206
            self.log.info("Perform DL3 to DL4 process!")
207
208
            for step in dl3_dl4_steps:
209
                analysis_step = AnalysisStep.create(step, self.config, **kwargs)
210
211
                datasets_list, models_list, instrument_spectral_info = analysis_step.run()
212
213
                self.update_models_list(models_list)
214
215
                # To get all datasets_names from the datasets and update the final datasets list
216
                for data in datasets_list:
217
                    if data.name not in self.dataset_name_list:
218
                        self.dataset_name_list.append(data.name)
219
                    self.datasets.append(data)
220
221
                self.add_to_instrument_info(instrument_spectral_info)
222
223
            self.datasets, self.final_model = set_models(
224
                self.config.target,
225
                self.datasets,
226
                self.dataset_name_list,
227
                models=self.final_model,
228
            )
229
            self.log.info("Models have been associated with the Datasets")
230
231
            self.update_dof_value()
232
233
        if len(dl4_dl5_steps) > 0:
234
            self.log.info("Perform DL4 to DL5 processes!")
235
236
            for step in dl4_dl5_steps:
237
                analysis_step = AnalysisStep.create(step, self.config, **kwargs)
238
239
                analysis_step.run(datasets=self.datasets, instrument_spectral_info=self.instrument_spectral_info)
240
241
                # Update the final data product objects
242
                for data_product in self.final_data_products:
243
                    if hasattr(analysis_step, data_product):
244
                        setattr(self, data_product, getattr(analysis_step, data_product))
245
246
        if self.fit_result:
247
            self.instrument_spectral_info, message = get_goodness_of_fit_stats(
248
                self.datasets, self.instrument_spectral_info
249
            )
250
            self.log.info(message)
251
252
    # keep these methods to be backward compatible
253
    def get_1d_datasets(self):
254
        """Produce stacked 1D datasets."""
255
        self.run(steps=["datasets-1d"])
256
257
    def get_3d_datasets(self):
258
        """Produce stacked 3D datasets."""
259
        self.run(steps=["datasets-3d"])
260
261
    def run_fit(self):
262
        """Fitting reduced datasets to model."""
263
        self.run(steps=["fit"])
264
265
    def get_flux_points(self):
266
        """Calculate flux points for a specific model component."""
267
        self.run(steps=["flux-points"])
268
269
    def update_config(self, config):
270
        """Update the primary config with another config."""
271
        self.config = self.config.update(config=config)
272