Passed
Pull Request — main (#155)
by Chaitanya
01:35
created

asgardpy.data.target   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 564
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 258
dl 0
loc 564
rs 8.8
c 0
b 0
f 0
wmc 45

2 Methods

Rating   Name   Duplication   Size   Complexity  
A ExpCutoffLogParabolaSpectralModel.evaluate() 0 8 1
A BrokenPowerLaw2SpectralModel.evaluate() 0 12 1

4 Functions

Rating   Name   Duplication   Size   Complexity  
F set_models() 0 82 15
C read_models_from_asgardpy_config() 0 97 11
A config_to_dict() 0 37 3
F apply_selection_mask_to_models() 0 87 14

How to fix   Complexity   

Complexity

Complex classes like asgardpy.data.target often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Classes containing the Target config parameters for the high-level interface and
3
also the functions involving Models generation and assignment to datasets.
4
"""
5
6
from enum import Enum
7
from typing import List
8
9
import astropy.units as u
10
import numpy as np
11
from gammapy.modeling import Parameter
12
from gammapy.modeling.models import (
13
    SPATIAL_MODEL_REGISTRY,
14
    SPECTRAL_MODEL_REGISTRY,
15
    DatasetModels,
16
    EBLAbsorptionNormSpectralModel,
17
    FoVBackgroundModel,
18
    Models,
19
    SkyModel,
20
    SpectralModel,
21
)
22
23
from asgardpy.base.base import AngleType, BaseConfig, FrameEnum, PathType
24
from asgardpy.base.geom import SkyPositionConfig
25
26
__all__ = [
27
    "BrokenPowerLaw2SpectralModel",
28
    "EBLAbsorptionModel",
29
    "ExpCutoffLogParabolaSpectralModel",
30
    "ModelTypeEnum",
31
    "RoISelectionConfig",
32
    "SpatialModelConfig",
33
    "SpectralModelConfig",
34
    "Target",
35
    "apply_selection_mask_to_models",
36
    "config_to_dict",
37
    "read_models_from_asgardpy_config",
38
    "set_models",
39
]
40
41
42
# Basic components to define the Target Config and any Models Config
43
class ModelTypeEnum(str, Enum):
44
    """
45
    Config section for Different Gammapy Model type.
46
    """
47
48
    skymodel = "SkyModel"
49
    fovbkgmodel = "FoVBackgroundModel"
50
51
52
class EBLAbsorptionModel(BaseConfig):
53
    """
54
    Config section for parameters to use for EBLAbsorptionNormSpectralModel.
55
    """
56
57
    filename: PathType = PathType("None")
58
    reference: str = ""
59
    type: str = "EBLAbsorptionNormSpectralModel"
60
    redshift: float = 0.0
61
    alpha_norm: float = 1.0
62
63
64
class ModelParams(BaseConfig):
65
    """Config section for parameters to use for a basic Parameter object."""
66
67
    name: str = ""
68
    value: float = 1
69
    unit: str = " "
70
    error: float = 0.1
71
    min: float = 0.1
72
    max: float = 10
73
    frozen: bool = True
74
75
76
class SpectralModelConfig(BaseConfig):
77
    """
78
    Config section for parameters to use for creating a SpectralModel object.
79
    """
80
81
    type: str = ""
82
    parameters: List[ModelParams] = [ModelParams()]
83
    ebl_abs: EBLAbsorptionModel = EBLAbsorptionModel()
84
85
86
class SpatialModelConfig(BaseConfig):
87
    """
88
    Config section for parameters to use for creating a SpatialModel object.
89
    """
90
91
    type: str = ""
92
    frame: FrameEnum = FrameEnum.icrs
93
    parameters: List[ModelParams] = [ModelParams()]
94
95
96
class ModelComponent(BaseConfig):
97
    """Config section for parameters to use for creating a SkyModel object."""
98
99
    name: str = ""
100
    type: ModelTypeEnum = ModelTypeEnum.skymodel
101
    datasets_names: List[str] = [""]
102
    spectral: SpectralModelConfig = SpectralModelConfig()
103
    spatial: SpatialModelConfig = SpatialModelConfig()
104
105
106
class RoISelectionConfig(BaseConfig):
107
    """
108
    Config section for parameters to perform some selection on FoV source
109
    models.
110
    """
111
112
    roi_radius: AngleType = 0 * u.deg
113
    free_sources: List[str] = []
114
115
116
class CatalogConfig(BaseConfig):
117
    """Config section for parameters to use information from Catalog."""
118
119
    name: str = ""
120
    selection_radius: AngleType = 0 * u.deg
121
    exclusion_radius: AngleType = 0 * u.deg
122
123
124
class Target(BaseConfig):
125
    """Config section for main information on creating various Models."""
126
127
    source_name: str = ""
128
    sky_position: SkyPositionConfig = SkyPositionConfig()
129
    use_uniform_position: bool = True
130
    models_file: PathType = PathType("None")
131
    datasets_with_fov_bkg_model: List[str] = []
132
    use_catalog: CatalogConfig = CatalogConfig()
133
    components: List[ModelComponent] = [ModelComponent()]
134
    covariance: str = ""
135
    from_3d: bool = False
136
    roi_selection: RoISelectionConfig = RoISelectionConfig()
137
138
139
class ExpCutoffLogParabolaSpectralModel(SpectralModel):
140
    r"""Spectral Exponential Cutoff Log Parabola model.
141
142
    Using a simple template from Gammapy.
143
144
    .. math::
145
        \phi(E) = \phi_0 \left( \frac{E}{E_0} \right) ^ {
146
          - \alpha_1 - \beta \log{ \left( \frac{E}{E_0} \right) }} \cdot
147
          \exp(- {(\lambda E})^{\alpha_2})
148
149
    Parameters
150
    ----------
151
    amplitude : `~astropy.units.Quantity`
152
        :math:`\phi_0`
153
    reference : `~astropy.units.Quantity`
154
        :math:`E_0`
155
    alpha_1 : `~astropy.units.Quantity`
156
        :math:`\alpha_1`
157
    beta : `~astropy.units.Quantity`
158
        :math:`\beta`
159
    lambda_ : `~astropy.units.Quantity`
160
        :math:`\lambda`
161
    alpha_2 : `~astropy.units.Quantity`
162
        :math:`\alpha_2`
163
    """
164
165
    tag = ["ExpCutoffLogParabolaSpectralModel", "ECLP"]
166
167
    amplitude = Parameter(
168
        "amplitude",
169
        "1e-12 cm-2 s-1 TeV-1",
170
        scale_method="scale10",
171
        interp="log",
172
        is_norm=True,
173
    )
174
    reference = Parameter("reference", "1 TeV", frozen=True)
175
    alpha_1 = Parameter("alpha_1", -2)
176
    alpha_2 = Parameter("alpha_2", 1, frozen=True)
177
    beta = Parameter("beta", 1)
178
    lambda_ = Parameter("lambda_", "0.1 TeV-1")
179
180
    @staticmethod
181
    def evaluate(energy, amplitude, reference, alpha_1, beta, lambda_, alpha_2):
182
        """Evaluate the model (static function)."""
183
        en_ref = energy / reference
184
        exponent = -alpha_1 - beta * np.log(en_ref)
185
        cutoff = np.exp(-np.power(energy * lambda_, alpha_2))
186
187
        return amplitude * np.power(en_ref, exponent) * cutoff
188
189
190
class BrokenPowerLaw2SpectralModel(SpectralModel):
191
    r"""Spectral broken power-law 2 model.
192
193
    In this slightly modified Broken Power Law, instead of having the second index
194
    as a distinct parameter, the difference in the spectral indices around the
195
    Break Energy is used, to try for some assumptions on the different physical
196
    processes that define the full spectrum, where the second process is dependent
197
    on the first process.
198
199
    For more information see :ref:`broken-powerlaw-spectral-model`.
200
201
    .. math::
202
        \phi(E) = \phi_0 \cdot \begin{cases}
203
                \left( \frac{E}{E_{break}} \right)^{-\Gamma_1} & \text{if } E < E_{break} \\
204
                \left( \frac{E}{E_{break}} \right)^{-(\Gamma_1 + \Delta\Gamma)} & \text{otherwise}
205
            \end{cases}
206
207
    Parameters
208
    ----------
209
    index1 : `~astropy.units.Quantity`
210
        :math:`\Gamma_1`
211
    index_diff : `~astropy.units.Quantity`
212
        :math:`\Delta\Gamma`
213
    amplitude : `~astropy.units.Quantity`
214
        :math:`\phi_0`
215
    ebreak : `~astropy.units.Quantity`
216
        :math:`E_{break}`
217
218
    See Also
219
    --------
220
    SmoothBrokenPowerLawSpectralModel
221
    """
222
223
    tag = ["BrokenPowerLaw2SpectralModel", "bpl2"]
224
    index1 = Parameter("index1", 2.0)
225
    index_diff = Parameter("index_diff", 1.0)
226
    amplitude = Parameter(
227
        name="amplitude",
228
        value="1e-12 cm-2 s-1 TeV-1",
229
        scale_method="scale10",
230
        interp="log",
231
        is_norm=True,
232
    )
233
    ebreak = Parameter("ebreak", "1 TeV")
234
235
    @staticmethod
236
    def evaluate(energy, index1, index_diff, amplitude, ebreak):
237
        """Evaluate the model (static function)."""
238
        energy = np.atleast_1d(energy)
239
        cond = energy < ebreak
240
        bpwl2 = amplitude * np.ones(energy.shape)
241
242
        index2 = index1 + index_diff
243
        bpwl2[cond] *= (energy[cond] / ebreak) ** (-index1)
244
        bpwl2[~cond] *= (energy[~cond] / ebreak) ** (-index2)
245
246
        return bpwl2
247
248
249
SPECTRAL_MODEL_REGISTRY.append(ExpCutoffLogParabolaSpectralModel)
250
SPECTRAL_MODEL_REGISTRY.append(BrokenPowerLaw2SpectralModel)
251
252
253
# Function for Models assignment
254
def set_models(
255
    config_target,
256
    datasets,
257
    datasets_name_list=None,
258
    models=None,
259
):
260
    """
261
    Set models on given Datasets.
262
263
    Parameters
264
    ----------
265
    config_target: `AsgardpyConfig.target`
266
        AsgardpyConfig containing target information.
267
    datasets: `gammapy.datasets.Datasets`
268
        Datasets object
269
    dataset_name_list: List
270
        List of datasets_names to be used on the Models, before assigning them
271
        to the given datasets.
272
    models : `~gammapy.modeling.models.Models` or str of file location or None
273
        Models object or YAML models string
274
275
    Returns
276
    -------
277
    datasets: `gammapy.datasets.Datasets`
278
        Datasets object with Models assigned.
279
    """
280
    # Have some checks on argument types
281
    if isinstance(models, (DatasetModels, list)):
282
        models = Models(models)
283
    elif isinstance(models, PathType):
284
        models = Models.read(models)
285
    elif models is None:
286
        models = Models(models)
287
    else:
288
        raise TypeError(f"Invalid type: {type(models)}")
289
290
    if len(models) == 0:
291
        if config_target.components[0].name != "":
292
            models = read_models_from_asgardpy_config(config_target)
293
        else:
294
            raise Exception("No input for Models provided for the Target source!")
295
    else:
296
        models = apply_selection_mask_to_models(
297
            list_sources=models,
298
            target_source=config_target.source_name,
299
            roi_radius=config_target.roi_selection.roi_radius,
300
            free_sources=config_target.roi_selection.free_sources,
301
        )
302
303
    if len(config_target.datasets_with_fov_bkg_model) > 0:
304
        # For extending a Background Model for a given 3D dataset name
305
        bkg_models = []
306
307
        for dataset in datasets:
308
            if dataset.name in config_target.datasets_with_fov_bkg_model:
309
                # Check if it is of MapDataset or MapDatasetOnOff type and
310
                # no associated background model exists already.
311
                if "MapDataset" in dataset.tag and dataset.background_model is None:
312
                    bkg_models.append(FoVBackgroundModel(dataset_name=f"{dataset.name}-bkg"))
313
314
        models.extend(bkg_models)
315
316
    if datasets_name_list is None:
317
        datasets_name_list = datasets.names
318
319
    if len(models) > 1:
320
        models[config_target.source_name].datasets_names = datasets_name_list
321
322
        # Check if FoVBackgroundModel is provided
323
        bkg_model_name = [m_name for m_name in models.names if "bkg" in m_name]
324
325
        if len(bkg_model_name) > 0:
326
            # Remove the -bkg part of the name of the FoVBackgroundModel to get
327
            # the appropriate datasets name.
328
            for bkg_name in bkg_model_name:
329
                models[bkg_name].datasets_names = [bkg_name[:-4]]
330
    else:
331
        models.datasets_names = datasets_name_list
332
333
    datasets.models = models
334
335
    return datasets, models
336
337
338
def apply_selection_mask_to_models(
339
    list_sources, target_source=None, selection_mask=None, roi_radius=0 * u.deg, free_sources=[]
340
):
341
    """
342
    For a given list of source models, with a given target source, apply various
343
    selection masks on the Region of Interest in the sky. This will lead to
344
    complete exclusion of models or freezing some or all spectral parameters.
345
    These selections excludes the diffuse background models in the given list.
346
347
    First priority is given if a distinct selection mask is provided, with a
348
    list of excluded regions to return only the source models within the selected
349
    ROI.
350
351
    Second priority is on creating a Circular ROI as per the given radius, and
352
    freeze all the spectral parameters of the models of the sources.
353
354
    Third priority is when a list of free_sources is provided, then the
355
    spectral amplitude of models of those sources, if present in the given list
356
    of models, will be unfrozen, and hence, allowed to be variable for fitting.
357
358
    Parameters
359
    ----------
360
    list_sources: '~gammapy.modeling.models.Models'
361
        Models object containing a list of source models.
362
    target_source: 'str'
363
        Name of the target source, whose position is used as the center of ROI.
364
    selection_mask: 'WcsNDMap'
365
        Map containing a boolean mask to apply to Models object.
366
    roi_radius: 'astropy.units.Quantity' or 'asgardpy.data.base.AngleType'
367
        Radius for a circular region around ROI (deg)
368
    free_sources: 'list'
369
        List of source names for which the spectral amplitude is to be kept free.
370
371
    Returns
372
    -------
373
    list_sources: '~gammapy.modeling.models.Models'
374
        Selected Models object.
375
    """
376
    list_sources_excluded = []
377
    list_diffuse = []
378
379
    # Separate the list of sources and diffuse background
380
    for model_ in list_sources:
381
        if "diffuse" in model_.name or "bkg" in model_.name:
382
            list_diffuse.append(model_)
383
        else:
384
            list_sources_excluded.append(model_)
385
386
    list_sources_excluded = Models(list_sources_excluded)
387
388
    # Get the target source position as the center of RoI
389
    if not target_source:
390
        target_source = list_sources_excluded[0].name
391
        target_source_pos = target_source.spatial_model.position
392
    else:
393
        target_source_pos = list_sources_excluded[target_source].spatial_model.position
394
395
    # If a distinct selection mask is provided
396
    if selection_mask:
397
        list_sources_excluded = list_sources_excluded.select_mask(selection_mask)
398
399
    # If RoI radius is provided and is not default
400
    if roi_radius.to_value("deg") != 0:
401
        for model_ in list_sources_excluded:
402
            model_pos = model_.spatial_model.position
403
            separation = target_source_pos.separation(model_pos).deg
404
            if separation >= roi_radius.deg:
405
                model_.spectral_model.freeze()
406
    else:
407
        # For a given list of non free sources, unfreeze the spectral amplitude
408
        if len(free_sources) > 0:
409
            for model_ in list_sources_excluded:
410
                # Freeze all spectral parameters for other models
411
                if model_.name != target_source:
412
                    model_.spectral_model.freeze()
413
                # and now unfreeze the amplitude of selected models
414
                if model_.name in free_sources:
415
                    model_.spectral_model.parameters["amplitude"].frozen = False
416
417
    # Add the diffuse background models back
418
    for diff_ in list_diffuse:
419
        list_sources_excluded.append(diff_)
420
421
    # Re-assign to the main variable
422
    list_sources = list_sources_excluded
423
424
    return list_sources
425
426
427
# Functions for Models generation
428
def read_models_from_asgardpy_config(config):
429
    """
430
    Reading Models information from AsgardpyConfig and return Models object.
431
    If a FoVBackgroundModel information is provided, it will also be added.
432
433
    Parameter
434
    ---------
435
    config: `AsgardpyConfig`
436
        Config section containing Target source information
437
438
    Returns
439
    -------
440
    models: `gammapy.modeling.models.Models`
441
        Full gammapy Models object.
442
    """
443
    models = Models()
444
445
    for model_config in config.components:
446
        if model_config.type == "SkyModel":
447
            # Spectral Model
448
            if model_config.spectral.ebl_abs.reference != "":
449
                model1 = SPECTRAL_MODEL_REGISTRY.get_cls(model_config.spectral.type)().from_dict(
450
                    {"spectral": config_to_dict(model_config.spectral)}
451
                )
452
453
                ebl_model = model_config.spectral.ebl_abs
454
455
                # First check for filename of a custom EBL model
456
                if ebl_model.filename.is_file():
457
                    model2 = EBLAbsorptionNormSpectralModel.read(
458
                        str(ebl_model.filename), redshift=ebl_model.redshift
459
                    )
460
                    # Update the reference name when using the custom EBL model for logging
461
                    ebl_model.reference = ebl_model.filename.name[:-8].replace("-", "_")
462
                else:
463
                    model2 = EBLAbsorptionNormSpectralModel.read_builtin(
464
                        ebl_model.reference, redshift=ebl_model.redshift
465
                    )
466
                if ebl_model.alpha_norm:
467
                    model2.alpha_norm.value = ebl_model.alpha_norm
468
                spectral_model = model1 * model2
469
            else:
470
                if model_config.spectral.type == "ExpCutoffLogParabolaSpectralModel":
471
                    spectral_model = ExpCutoffLogParabolaSpectralModel().from_dict(
472
                        {"spectral": config_to_dict(model_config.spectral)}
473
                    )
474
                elif model_config.spectral.type == "BrokenPowerLaw2SpectralModel":
475
                    spectral_model = BrokenPowerLaw2SpectralModel().from_dict(
476
                        {"spectral": config_to_dict(model_config.spectral)}
477
                    )
478
                else:
479
                    spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls(model_config.spectral.type)().from_dict(
480
                        {"spectral": config_to_dict(model_config.spectral)}
481
                    )
482
            spectral_model.name = config.source_name
483
484
            # Spatial model if provided
485
            if model_config.spatial.type != "":
486
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls(model_config.spatial.type)().from_dict(
487
                    {"spatial": config_to_dict(model_config.spatial)}
488
                )
489
            else:
490
                spatial_model = None
491
492
            models.append(
493
                SkyModel(
494
                    spectral_model=spectral_model,
495
                    spatial_model=spatial_model,
496
                    name=config.source_name,
497
                )
498
            )
499
500
        # FoVBackgroundModel is the second component
501
        if model_config.type == "FoVBackgroundModel":
502
            # Spectral Model. At least this has to be provided for distinct
503
            # parameter values
504
            spectral_model_fov = SPECTRAL_MODEL_REGISTRY.get_cls(model_config.spectral.type)().from_dict(
505
                {"spectral": config_to_dict(model_config.spectral)}
506
            )
507
508
            # Spatial model if provided
509
            if model_config.spatial.type != "":
510
                spatial_model_fov = SPATIAL_MODEL_REGISTRY.get_cls(model_config.spatial.type)().from_dict(
511
                    {"spatial": config_to_dict(model_config.spatial)}
512
                )
513
            else:
514
                spatial_model_fov = None
515
516
            models.append(
517
                FoVBackgroundModel(
518
                    spectral_model=spectral_model_fov,
519
                    spatial_model=spatial_model_fov,
520
                    dataset_name=model_config.datasets_names[0],
521
                )
522
            )
523
524
    return models
525
526
527
def config_to_dict(model_config):
528
    """
529
    Convert the Spectral/Spatial models into dict.
530
    Probably an extra step and maybe removed later.
531
532
    Parameter
533
    ---------
534
    model_config: `AsgardpyConfig`
535
        Config section containing Target Model SkyModel components only.
536
537
    Returns
538
    -------
539
    model_dict: dict
540
        dictionary of the particular model.
541
    """
542
    model_dict = {}
543
    model_dict["type"] = str(model_config.type)
544
    model_dict["parameters"] = []
545
546
    for par in model_config.parameters:
547
        par_dict = {}
548
        par_dict["name"] = par.name
549
        par_dict["value"] = par.value
550
        par_dict["unit"] = par.unit
551
        par_dict["error"] = par.error
552
        par_dict["min"] = par.min
553
        par_dict["max"] = par.max
554
        par_dict["frozen"] = par.frozen
555
        model_dict["parameters"].append(par_dict)
556
557
    # For spatial model, include frame info
558
    try:
559
        getattr(model_dict, "frame")
560
    except AttributeError:
561
        pass
562
563
    return model_dict
564