Passed
Pull Request — main (#157)
by Chaitanya
04:40 queued 02:37
created

asgardpy.data.target   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 564
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 257
dl 0
loc 564
rs 8.72
c 0
b 0
f 0
wmc 46

How to fix   Complexity   

Complexity

Complex classes like asgardpy.data.target often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Classes containing the Target config parameters for the high-level interface and
3
also the functions involving Models generation and assignment to datasets.
4
"""
5
6
from enum import Enum
7
8
import astropy.units as u
9
import numpy as np
10
from gammapy.modeling import Parameter
11
from gammapy.modeling.models import (
12
    SPATIAL_MODEL_REGISTRY,
13
    SPECTRAL_MODEL_REGISTRY,
14
    DatasetModels,
15
    EBLAbsorptionNormSpectralModel,
16
    FoVBackgroundModel,
17
    Models,
18
    SkyModel,
19
    SpectralModel,
20
)
21
22
from asgardpy.base.base import AngleType, BaseConfig, FrameEnum, PathType
23
from asgardpy.base.geom import SkyPositionConfig
24
25
__all__ = [
26
    "BrokenPowerLaw2SpectralModel",
27
    "EBLAbsorptionModel",
28
    "ExpCutoffLogParabolaSpectralModel",
29
    "ModelTypeEnum",
30
    "RoISelectionConfig",
31
    "SpatialModelConfig",
32
    "SpectralModelConfig",
33
    "Target",
34
    "add_ebl_model_from_config",
35
    "apply_selection_mask_to_models",
36
    "config_to_dict",
37
    "read_models_from_asgardpy_config",
38
    "set_models",
39
]
40
41
42
# Basic components to define the Target Config and any Models Config
43
class ModelTypeEnum(str, Enum):
44
    """
45
    Config section for Different Gammapy Model type.
46
    """
47
48
    skymodel = "SkyModel"
49
    fovbkgmodel = "FoVBackgroundModel"
50
51
52
class EBLAbsorptionModel(BaseConfig):
53
    """
54
    Config section for parameters to use for EBLAbsorptionNormSpectralModel.
55
    """
56
57
    filename: PathType = PathType("None")
58
    reference: str = ""
59
    type: str = "EBLAbsorptionNormSpectralModel"
60
    redshift: float = 0.0
61
    alpha_norm: float = 1.0
62
63
64
class ModelParams(BaseConfig):
65
    """Config section for parameters to use for a basic Parameter object."""
66
67
    name: str = ""
68
    value: float = 1
69
    unit: str = " "
70
    error: float = 0.1
71
    min: float = 0.1
72
    max: float = 10
73
    frozen: bool = True
74
75
76
class SpectralModelConfig(BaseConfig):
77
    """
78
    Config section for parameters to use for creating a SpectralModel object.
79
    """
80
81
    type: str = ""
82
    parameters: list[ModelParams] = [ModelParams()]
83
    ebl_abs: EBLAbsorptionModel = EBLAbsorptionModel()
84
85
86
class SpatialModelConfig(BaseConfig):
87
    """
88
    Config section for parameters to use for creating a SpatialModel object.
89
    """
90
91
    type: str = ""
92
    frame: FrameEnum = FrameEnum.icrs
93
    parameters: list[ModelParams] = [ModelParams()]
94
95
96
class ModelComponent(BaseConfig):
97
    """Config section for parameters to use for creating a SkyModel object."""
98
99
    name: str = ""
100
    type: ModelTypeEnum = ModelTypeEnum.skymodel
101
    datasets_names: list[str] = [""]
102
    spectral: SpectralModelConfig = SpectralModelConfig()
103
    spatial: SpatialModelConfig = SpatialModelConfig()
104
105
106
class RoISelectionConfig(BaseConfig):
107
    """
108
    Config section for parameters to perform some selection on FoV source
109
    models.
110
    """
111
112
    roi_radius: AngleType = 0 * u.deg
113
    free_sources: list[str] = []
114
115
116
class CatalogConfig(BaseConfig):
117
    """Config section for parameters to use information from Catalog."""
118
119
    name: str = ""
120
    selection_radius: AngleType = 0 * u.deg
121
    exclusion_radius: AngleType = 0 * u.deg
122
123
124
class Target(BaseConfig):
125
    """Config section for main information on creating various Models."""
126
127
    source_name: str = ""
128
    sky_position: SkyPositionConfig = SkyPositionConfig()
129
    use_uniform_position: bool = True
130
    models_file: PathType = PathType("None")
131
    datasets_with_fov_bkg_model: list[str] = []
132
    use_catalog: CatalogConfig = CatalogConfig()
133
    components: list[ModelComponent] = [ModelComponent()]
134
    covariance: str = ""
135
    from_3d: bool = False
136
    roi_selection: RoISelectionConfig = RoISelectionConfig()
137
138
139
class ExpCutoffLogParabolaSpectralModel(SpectralModel):
140
    r"""Spectral Exponential Cutoff Log Parabola model.
141
142
    Using a simple template from Gammapy.
143
144
    .. math::
145
        \phi(E) = \phi_0 \left( \frac{E}{E_0} \right) ^ {
146
          - \alpha_1 - \beta \log{ \left( \frac{E}{E_0} \right) }} \cdot
147
          \exp(- {(\lambda E})^{\alpha_2})
148
149
    Parameters
150
    ----------
151
    amplitude : `~astropy.units.Quantity`
152
        :math:`\phi_0`
153
    reference : `~astropy.units.Quantity`
154
        :math:`E_0`
155
    alpha_1 : `~astropy.units.Quantity`
156
        :math:`\alpha_1`
157
    beta : `~astropy.units.Quantity`
158
        :math:`\beta`
159
    lambda_ : `~astropy.units.Quantity`
160
        :math:`\lambda`
161
    alpha_2 : `~astropy.units.Quantity`
162
        :math:`\alpha_2`
163
    """
164
165
    tag = ["ExpCutoffLogParabolaSpectralModel", "ECLP"]
166
167
    amplitude = Parameter(
168
        "amplitude",
169
        "1e-12 cm-2 s-1 TeV-1",
170
        scale_method="scale10",
171
        interp="log",
172
        is_norm=True,
173
    )
174
    reference = Parameter("reference", "1 TeV", frozen=True)
175
    alpha_1 = Parameter("alpha_1", -2)
176
    alpha_2 = Parameter("alpha_2", 1, frozen=True)
177
    beta = Parameter("beta", 1)
178
    lambda_ = Parameter("lambda_", "0.1 TeV-1")
179
180
    @staticmethod
181
    def evaluate(energy, amplitude, reference, alpha_1, beta, lambda_, alpha_2):
182
        """Evaluate the model (static function)."""
183
        en_ref = energy / reference
184
        exponent = -alpha_1 - beta * np.log(en_ref)
185
        cutoff = np.exp(-np.power(energy * lambda_, alpha_2))
186
187
        return amplitude * np.power(en_ref, exponent) * cutoff
188
189
190
class BrokenPowerLaw2SpectralModel(SpectralModel):
191
    r"""Spectral broken power-law 2 model.
192
193
    In this slightly modified Broken Power Law, instead of having the second index
194
    as a distinct parameter, the difference in the spectral indices around the
195
    Break Energy is used, to try for some assumptions on the different physical
196
    processes that define the full spectrum, where the second process is dependent
197
    on the first process.
198
199
    For more information see :ref:`broken-powerlaw-spectral-model`.
200
201
    .. math::
202
        \phi(E) = \phi_0 \cdot \begin{cases}
203
                \left( \frac{E}{E_{break}} \right)^{-\Gamma_1} & \text{if } E < E_{break} \\
204
                \left( \frac{E}{E_{break}} \right)^{-(\Gamma_1 + \Delta\Gamma)} & \text{otherwise}
205
            \end{cases}
206
207
    Parameters
208
    ----------
209
    index1 : `~astropy.units.Quantity`
210
        :math:`\Gamma_1`
211
    index_diff : `~astropy.units.Quantity`
212
        :math:`\Delta\Gamma`
213
    amplitude : `~astropy.units.Quantity`
214
        :math:`\phi_0`
215
    ebreak : `~astropy.units.Quantity`
216
        :math:`E_{break}`
217
218
    See Also
219
    --------
220
    SmoothBrokenPowerLawSpectralModel
221
    """
222
223
    tag = ["BrokenPowerLaw2SpectralModel", "bpl2"]
224
    index1 = Parameter("index1", 2.0)
225
    index_diff = Parameter("index_diff", 1.0)
226
    amplitude = Parameter(
227
        name="amplitude",
228
        value="1e-12 cm-2 s-1 TeV-1",
229
        scale_method="scale10",
230
        interp="log",
231
        is_norm=True,
232
    )
233
    ebreak = Parameter("ebreak", "1 TeV")
234
235
    @staticmethod
236
    def evaluate(energy, index1, index_diff, amplitude, ebreak):
237
        """Evaluate the model (static function)."""
238
        energy = np.atleast_1d(energy)
239
        cond = energy < ebreak
240
        bpwl2 = amplitude * np.ones(energy.shape)
241
242
        index2 = index1 + index_diff
243
        bpwl2[cond] *= (energy[cond] / ebreak) ** (-index1)
244
        bpwl2[~cond] *= (energy[~cond] / ebreak) ** (-index2)
245
246
        return bpwl2
247
248
249
SPECTRAL_MODEL_REGISTRY.append(ExpCutoffLogParabolaSpectralModel)
250
SPECTRAL_MODEL_REGISTRY.append(BrokenPowerLaw2SpectralModel)
251
252
253
# Function for Models assignment
254
def extend_bkg_models(models, all_datasets, datasets_with_fov_bkg_model):
255
    """
256
    Function for extending a Background Model for a given 3D dataset name.
257
258
    It checks if the given dataset is of MapDataset or MapDatasetOnOff type and
259
    no associated background model exists already.
260
    """
261
    if len(datasets_with_fov_bkg_model) > 0:
262
        bkg_models = []
263
264
        for dataset in all_datasets:
265
            if dataset.name in datasets_with_fov_bkg_model:
266
                if "MapDataset" in dataset.tag and dataset.background_model is None:
267
                    bkg_models.append(FoVBackgroundModel(dataset_name=f"{dataset.name}-bkg"))
268
269
        models.extend(bkg_models)
270
271
    return models
272
273
274
def update_models_datasets_names(models, source_name, datasets_name_list):
275
    """
276
    Function to update the datasets_names list for the given list of models.
277
278
    If FoVBackgroundModel is provided, remove the -bkg part of the name of the
279
    dataset to get the appropriate datasets_name.
280
    """
281
    if len(models) > 1:
282
        models[source_name].datasets_names = datasets_name_list
283
284
        bkg_model_name = [m_name for m_name in models.names if "bkg" in m_name]
285
286
        if len(bkg_model_name) > 0:
287
            for bkg_name in bkg_model_name:
288
                models[bkg_name].datasets_names = [bkg_name[:-4]]
289
    else:
290
        models.datasets_names = datasets_name_list
291
292
    return models
293
294
295
def set_models(
296
    config_target,
297
    datasets,
298
    datasets_name_list=None,
299
    models=None,
300
):
301
    """
302
    Set models on given Datasets.
303
304
    Parameters
305
    ----------
306
    config_target: `AsgardpyConfig.target`
307
        AsgardpyConfig containing target information.
308
    datasets: `gammapy.datasets.Datasets`
309
        Datasets object
310
    dataset_name_list: list
311
        List of datasets_names to be used on the Models, before assigning them
312
        to the given datasets.
313
    models : `~gammapy.modeling.models.Models` or str of file location or None
314
        Models object or YAML models string
315
316
    Returns
317
    -------
318
    datasets: `gammapy.datasets.Datasets`
319
        Datasets object with Models assigned.
320
    """
321
    # Have some checks on argument types
322
    if isinstance(models, DatasetModels | list):
323
        models = Models(models)
324
    elif isinstance(models, PathType):
325
        models = Models.read(models)
326
    elif models is None:
327
        models = Models(models)
328
    else:
329
        raise TypeError(f"Invalid type: {type(models)}")
330
331
    if len(models) == 0:
332
        if config_target.components[0].name != "":
333
            models = read_models_from_asgardpy_config(config_target)
334
        else:
335
            raise ValueError("No input for Models provided for the Target source!")
336
    else:
337
        models = apply_selection_mask_to_models(
338
            list_sources=models,
339
            target_source=config_target.source_name,
340
            roi_radius=config_target.roi_selection.roi_radius,
341
            free_sources=config_target.roi_selection.free_sources,
342
        )
343
344
    models = extend_bkg_models(models, datasets, config_target.datasets_with_fov_bkg_model)
345
346
    if datasets_name_list is None:
347
        datasets_name_list = datasets.names
348
349
    models = update_models_datasets_names(models, config_target.source_name, datasets_name_list)
350
351
    datasets.models = models
352
353
    return datasets, models
354
355
356
def apply_models_mask_in_roi(list_sources_excluded, target_source, roi_radius, free_sources):
357
    """
358
    Function to apply a selection mask on a given list of models in a given RoI.
359
360
    The target source position is considered as the center of RoI.
361
362
    For a given list of non free sources, the spectral amplitude is left
363
    unfrozen or allowed to vary.
364
    """
365
    if not target_source:
366
        target_source = list_sources_excluded[0].name
367
        target_source_pos = list_sources_excluded[0].spatial_model.position
368
    else:
369
        target_source_pos = list_sources_excluded[target_source].spatial_model.position
370
371
    # If RoI radius is provided and is not default
372
    if roi_radius.to_value("deg") != 0:
373
        for model_ in list_sources_excluded:
374
            model_pos = model_.spatial_model.position
375
            separation = target_source_pos.separation(model_pos).deg
376
            if separation >= roi_radius.deg:
377
                model_.spectral_model.freeze()
378
    else:
379
        if len(free_sources) > 0:
380
            for model_ in list_sources_excluded:
381
                # Freeze all spectral parameters for other models
382
                if model_.name != target_source:
383
                    model_.spectral_model.freeze()
384
                # and now unfreeze the amplitude of selected models
385
                if model_.name in free_sources:
386
                    model_.spectral_model.parameters["amplitude"].frozen = False
387
388
    return list_sources_excluded
389
390
391
def apply_selection_mask_to_models(
392
    list_sources, target_source=None, selection_mask=None, roi_radius=0 * u.deg, free_sources=None
393
):
394
    """
395
    For a given list of source models, with a given target source, apply various
396
    selection masks on the Region of Interest in the sky. This will lead to
397
    complete exclusion of models or freezing some or all spectral parameters.
398
    These selections excludes the diffuse background models in the given list.
399
400
    First priority is given if a distinct selection mask is provided, with a
401
    list of excluded regions to return only the source models within the selected
402
    ROI.
403
404
    Second priority is on creating a Circular ROI as per the given radius, and
405
    freeze all the spectral parameters of the models of the sources.
406
407
    Third priority is when a list of free_sources is provided, then the
408
    spectral amplitude of models of those sources, if present in the given list
409
    of models, will be unfrozen, and hence, allowed to be variable for fitting.
410
411
    Parameters
412
    ----------
413
    list_sources: '~gammapy.modeling.models.Models'
414
        Models object containing a list of source models.
415
    target_source: 'str'
416
        Name of the target source, whose position is used as the center of ROI.
417
    selection_mask: 'WcsNDMap'
418
        Map containing a boolean mask to apply to Models object.
419
    roi_radius: 'astropy.units.Quantity' or 'asgardpy.data.base.AngleType'
420
        Radius for a circular region around ROI (deg)
421
    free_sources: 'list'
422
        List of source names for which the spectral amplitude is to be kept free.
423
424
    Returns
425
    -------
426
    list_sources: '~gammapy.modeling.models.Models'
427
        Selected Models object.
428
    """
429
    list_sources_excluded = []
430
    list_diffuse = []
431
432
    if free_sources is None:
433
        free_sources = []
434
435
    # Separate the list of sources and diffuse background
436
    for model_ in list_sources:
437
        if "diffuse" in model_.name or "bkg" in model_.name:
438
            list_diffuse.append(model_)
439
        else:
440
            list_sources_excluded.append(model_)
441
442
    list_sources_excluded = Models(list_sources_excluded)
443
444
    # If a distinct selection mask is provided
445
    if selection_mask:
446
        list_sources_excluded = list_sources_excluded.select_mask(selection_mask)
447
448
    list_sources_excluded = apply_models_mask_in_roi(
449
        list_sources_excluded, target_source, roi_radius, free_sources
450
    )
451
452
    # Add the diffuse background models back
453
    for diff_ in list_diffuse:
454
        list_sources_excluded.append(diff_)
455
456
    # Re-assign to the main variable
457
    list_sources = list_sources_excluded
458
459
    return list_sources
460
461
462
# Functions for Models generation
463
def add_ebl_model_from_config(spectral_model, model_config):
464
    """
465
    Function for adding the EBL model from a given AsgardpyConfig file to the
466
    given spectral model (assumed intrinsic).
467
    """
468
    ebl_model = model_config.spectral.ebl_abs
469
470
    # First check for filename of a custom EBL model
471
    if ebl_model.filename.is_file():
472
        model2 = EBLAbsorptionNormSpectralModel.read(str(ebl_model.filename), redshift=ebl_model.redshift)
473
        # Update the reference name when using the custom EBL model for logging
474
        ebl_model.reference = ebl_model.filename.name[:-8].replace("-", "_")
475
    else:
476
        model2 = EBLAbsorptionNormSpectralModel.read_builtin(ebl_model.reference, redshift=ebl_model.redshift)
477
    if ebl_model.alpha_norm:
478
        model2.alpha_norm.value = ebl_model.alpha_norm
479
480
    spectral_model *= model2
481
482
    return spectral_model
483
484
485
def read_models_from_asgardpy_config(config):
486
    """
487
    Reading Models information from AsgardpyConfig and return Models object.
488
    If a FoVBackgroundModel information is provided, it will also be added.
489
490
    Parameter
491
    ---------
492
    config: `AsgardpyConfig`
493
        Config section containing Target source information
494
495
    Returns
496
    -------
497
    models: `gammapy.modeling.models.Models`
498
        Full gammapy Models object.
499
    """
500
    models = Models()
501
502
    for model_config in config.components:
503
        # Spectral Model
504
        spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls(model_config.spectral.type)().from_dict(
505
            {"spectral": config_to_dict(model_config.spectral)}
506
        )
507
        if model_config.spectral.ebl_abs.reference != "":
508
            spectral_model = add_ebl_model_from_config(spectral_model, model_config)
509
510
        spectral_model.name = config.source_name
511
512
        # Spatial model if provided
513
        if model_config.spatial.type != "":
514
            spatial_model = SPATIAL_MODEL_REGISTRY.get_cls(model_config.spatial.type)().from_dict(
515
                {"spatial": config_to_dict(model_config.spatial)}
516
            )
517
        else:
518
            spatial_model = None
519
520
        match model_config.type:
521
            case "SkyModel":
522
                models.append(
523
                    SkyModel(
524
                        spectral_model=spectral_model,
525
                        spatial_model=spatial_model,
526
                        name=config.source_name,
527
                    )
528
                )
529
530
            # FoVBackgroundModel is the second component
531
            case "FoVBackgroundModel":
532
                models.append(
533
                    FoVBackgroundModel(
534
                        spectral_model=spectral_model,
535
                        spatial_model=spatial_model,
536
                        dataset_name=model_config.datasets_names[0],
537
                    )
538
                )
539
540
    return models
541
542
543
def config_to_dict(model_config):
544
    """
545
    Convert the Spectral/Spatial models into dict.
546
    Probably an extra step and maybe removed later.
547
548
    Parameter
549
    ---------
550
    model_config: `AsgardpyConfig`
551
        Config section containing Target Model SkyModel components only.
552
553
    Returns
554
    -------
555
    model_dict: dict
556
        dictionary of the particular model.
557
    """
558
    model_dict = {}
559
    model_dict["type"] = str(model_config.type)
560
    model_dict["parameters"] = []
561
562
    for par in model_config.parameters:
563
        par_dict = {}
564
        par_dict["name"] = par.name
565
        par_dict["value"] = par.value
566
        par_dict["unit"] = par.unit
567
        par_dict["error"] = par.error
568
        par_dict["min"] = par.min
569
        par_dict["max"] = par.max
570
        par_dict["frozen"] = par.frozen
571
        model_dict["parameters"].append(par_dict)
572
573
    # For spatial model, include frame info
574
    if hasattr(model_config, "frame"):
575
        model_dict["frame"] = model_config.frame
576
577
    return model_dict
578