Passed
Pull Request — main (#157)
by Chaitanya
04:40 queued 02:37
created

asgardpy.gammapy.interoperate_models   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 359
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 186
dl 0
loc 359
rs 3.6
c 0
b 0
f 0
wmc 60

How to fix   Complexity   

Complexity

Complex classes like asgardpy.gammapy.interoperate_models often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Functions for having interoperatibility of Models type objects with Gammapy
3
objects.
4
"""
5
from astropy.coordinates import SkyCoord
6
from gammapy.maps import Map
7
from gammapy.modeling import Parameter, Parameters
8
from gammapy.modeling.models import SPATIAL_MODEL_REGISTRY, SPECTRAL_MODEL_REGISTRY
9
10
__all__ = [
11
    "get_gammapy_spectral_model",
12
    "params_renaming_to_gammapy",
13
    "params_rescale_to_gammapy",
14
    "xml_spatial_model_to_gammapy",
15
    "xml_spectral_model_to_gammapy",
16
]
17
18
19
def get_gammapy_spectral_model(spectrum_type, ebl_atten=False, base_model_type="Fermi-XML"):
20
    """
21
    Getting the correct Gammapy SpectralModel object after reading a name from a
22
    different base_model_type.
23
24
    Parameter
25
    ---------
26
    spectrum_type: str
27
        Spectral Model type as written in the base model.
28
    ebl_atten: bool
29
        If EBL attenuated spectral model needs different treatment.
30
    base_model_type: str
31
        Name indicating the XML format used to read the skymodels from.
32
33
    Return
34
    ------
35
    spectral_model: `gammapy.modleing.models.SpectralModel`
36
        Gammapy SpectralModel object corresponding to the provided name.
37
    ebl_atten: bool
38
        Boolean for EBL attenuated spectral model.
39
    """
40
    if base_model_type == "Fermi-XML":
41
        spectrum_type_no_ebl = spectrum_type.split("EblAtten::")[-1]
42
43
        match spectrum_type_no_ebl:
44
            case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
45
                spectrum_type_final = "ExpCutoffPowerLawSpectralModel"
46
            case "PLSuperExpCutoff4":
47
                spectrum_type_final = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
48
            case _:
49
                spectrum_type_final = f"{spectrum_type_no_ebl}SpectralModel"
50
51
        spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls(spectrum_type_final)()
52
53
        if spectrum_type_no_ebl == "LogParabola":
54
            if "EblAtten" in spectrum_type:
55
                spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls("PowerLawSpectralModel")()
56
                ebl_atten = True
57
            else:
58
                spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls("LogParabolaSpectralModel")()
59
60
    return spectral_model, ebl_atten
61
62
63
def rename_fermi_energy_params(param_name):
64
    """
65
    Simple match-case for renaming Energy parameters from Fermi-XML model to
66
    Gammapy standard.
67
    """
68
    match param_name:
69
        case "scale" | "eb":
70
            return "reference"
71
72
        case "breakvalue":
73
            return "ebreak"
74
75
        case "lowerlimit":
76
            return "emin"
77
78
        case "upperlimit":
79
            return "emax"
80
81
82
def rename_fermi_index_params(param_name, spectrum_type):
83
    """
84
    Simple match-case for renaming spectral index parameters from Fermi-XML
85
    model to Gammapy standard.
86
87
    Some spectral models share common names in Fermi-XML but not in Gammapy.
88
    """
89
    match param_name:
90
        case "index":
91
            return "index"
92
93
        case "index1":
94
            match spectrum_type:
95
                case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
96
                    return "index"
97
                case _:
98
                    return "index1"  # For spectrum_type == "BrokenPowerLaw"
99
100
        case "indexs":
101
            return "index_1"  # For spectrum_type == "PLSuperExpCutoff4"
102
103
        case "index2":
104
            match spectrum_type:
105
                case "PLSuperExpCutoff4":
106
                    return "index_2"
107
                case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
108
                    return "alpha"
109
                case _:
110
                    return "index2"  # For spectrum_type == "BrokenPowerLaw"
111
112
113
def params_renaming_to_gammapy(params_1_name, params_gpy, spectrum_type, params_1_base_model="Fermi-XML"):
114
    """
115
    Reading from a given parameter name, get basic parameters details like name,
116
    unit and is_norm as per Gammapy definition.
117
118
    This function collects all such switch cases, based on the base model of the
119
    given set of parameters.
120
    """
121
    if params_1_base_model == "Fermi-XML":
122
        match params_1_name:
123
            case "norm" | "prefactor" | "integral":
124
                params_gpy["name"] = "amplitude"
125
                params_gpy["is_norm"] = True
126
127
                match spectrum_type:
128
                    case "PowerLaw2":
129
                        params_gpy["unit"] = "cm-2 s-1"
130
                    case _:
131
                        params_gpy["unit"] = "cm-2 s-1 TeV-1"
132
133
            case "scale" | "eb" | "breakvalue" | "lowerlimit" | "upperlimit":
134
                params_gpy["name"] = rename_fermi_energy_params(params_1_name)
135
136
            case "index" | "index1" | "indexs" | "index2":
137
                params_gpy["name"] = rename_fermi_index_params(params_1_name, spectrum_type)
138
139
            case "cutoff" | "expfactor":
140
                params_gpy["name"] = "lambda_"
141
                params_gpy["unit"] = "TeV-1"
142
143
            case "expfactors":
144
                params_gpy["name"] = "expfactor"
145
146
    return params_gpy
147
148
149
def rescale_parameters(params_dict, scale_factor):
150
    """
151
    Function to rescale the value of the parameters as per Gammapy standard,
152
    using a multiplying factor, considering the final scale value to be 1.
153
    """
154
    params_dict["value"] = float(params_dict["value"]) * float(params_dict["scale"]) * scale_factor
155
156
    if "error" in params_dict:
157
        params_dict["error"] = float(params_dict["error"]) * float(params_dict["scale"]) * scale_factor
158
159
    if scale_factor == -1:
160
        # Reverse the limits while changing the sign
161
        min_ = float(params_dict["min"]) * float(params_dict["scale"]) * scale_factor
162
        max_ = float(params_dict["max"]) * float(params_dict["scale"]) * scale_factor
163
164
        params_dict["min"] = min(min_, max_)
165
        params_dict["max"] = max(min_, max_)
166
    else:
167
        params_dict["min"] = float(params_dict["min"]) * float(params_dict["scale"]) * scale_factor
168
        params_dict["max"] = float(params_dict["max"]) * float(params_dict["scale"]) * scale_factor
169
170
    params_dict["scale"] = 1.0
171
172
    return params_dict
173
174
175
def invert_parameters(params_dict, scale_factor):
176
    """
177
    Function to rescale the value of the parameters as per Gammapy standard,
178
    when the main parameter value needs to be inverted, considering the final
179
    scale value to be 1.
180
    """
181
    val_ = float(params_dict["value"]) * float(params_dict["scale"])
182
183
    params_dict["value"] = scale_factor / val_
184
185
    if "error" in params_dict:
186
        params_dict["error"] = scale_factor * float(params_dict["error"]) / (val_**2)
187
188
    min_ = scale_factor / (float(params_dict["min"]) * float(params_dict["scale"]))
189
    max_ = scale_factor / (float(params_dict["max"]) * float(params_dict["scale"]))
190
191
    params_dict["min"] = min(min_, max_)
192
    params_dict["max"] = max(min_, max_)
193
194
    params_dict["scale"] = 1.0
195
196
    return params_dict
197
198
199
def params_rescale_to_gammapy(params_gpy, spectrum_type, en_scale_1_to_gpy=1.0e-6, keep_sign=False):
200
    """
201
    Rescaling parameters to be used with Gammapy standard units, by using the
202
    various physical factors (energy for now), compared with the initial set of
203
    parameters as compared with Gammapy standard units.
204
205
    Also, scales the value, min and max of the given parameters, depending on
206
    their Parameter names.
207
    """
208
    match params_gpy["name"]:
209
        case "reference" | "ebreak" | "emin" | "emax":
210
            params_gpy["unit"] = "TeV"
211
            params_gpy = rescale_parameters(params_gpy, en_scale_1_to_gpy)
212
213
        case "amplitude":
214
            params_gpy = rescale_parameters(params_gpy, 1 / en_scale_1_to_gpy)
215
216
        case "index" | "index_1" | "index_2" | "beta":
217
            if not keep_sign:
218
                # Other than EBL Attenuated Power Law?
219
                # spectral indices in gammapy are always taken as positive values.
220
                val_ = float(params_gpy["value"]) * float(params_gpy["scale"])
221
                if val_ < 0:
222
                    params_gpy = rescale_parameters(params_gpy, -1)
223
224
        case "lambda_":
225
            if spectrum_type == "PLSuperExpCutoff":
226
                # Original parameter is inverse of what gammapy uses
227
                params_gpy = invert_parameters(params_gpy, en_scale_1_to_gpy)
228
229
    if float(params_gpy["scale"]) != 1.0:
230
        params_gpy = rescale_parameters(params_gpy, 1)
231
232
    return params_gpy
233
234
235
def param_dict_to_gammapy_parameter(new_par):
236
    """Read a dict object into Gammapy Parameter object."""
237
    new_param = Parameter(name=new_par["name"], value=new_par["value"])
238
239
    if "error" in new_par:
240
        new_param.error = new_par["error"]
241
242
    new_param.min = new_par["min"]
243
    new_param.max = new_par["max"]
244
    new_param.unit = new_par["unit"]
245
    new_param.frozen = new_par["frozen"]
246
    new_param._is_norm = new_par["is_norm"]
247
248
    return new_param
249
250
251
def xml_spectral_model_to_gammapy(
252
    params, spectrum_type, is_target=False, keep_sign=False, base_model_type="Fermi-XML"
253
):
254
    """
255
    Convert the Spectral Models information from XML model of FermiTools to Gammapy
256
    standards and return Parameters list.
257
    Details of the XML model can be seen at
258
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/source_models.html
259
    and with examples at
260
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/xml_model_defs.html
261
262
    Models from the XML model, not read are -
263
    ExpCutoff (Blazar modeling with EBL absorption included),
264
    BPLExpCutoff,
265
    PLSuperExpCutoff3 (Pulsar modeling),
266
    BandFunction (GRB analysis)
267
268
    Parameters
269
    ----------
270
    params: `gammapy.modeling.Parameters`
271
        List of gammapy Parameter object of a particular Model
272
    spectrum_type: str
273
        Spectrum type as defined in XML. To be used only for special cases like
274
        PLSuperExpCutoff, PLSuperExpCutoff2 and PLSuperExpCutoff4
275
    is_target: bool
276
        Boolean to check if the given list of Parameters belong to the target
277
        source model or not.
278
    keep_sign: bool
279
        Boolean to keep the same sign on the parameter values or not.
280
    base_model_type: str
281
        Name indicating the XML format used to read the skymodels from.
282
283
    Returns
284
    -------
285
    params_final: `gammapy.modeling.Parameters`
286
        Final list of gammapy Parameter object
287
    """
288
    new_params = []
289
290
    # Some modifications on scaling/sign
291
    # By default for base_model_type == "Fermi-XML"
292
    en_scale_1_to_gpy = 1.0e-6
293
294
    for par in params:
295
        new_par = {}
296
297
        for key_ in par.keys():
298
            # Getting the "@par_name" for each parameter without the "@"
299
            if key_ != "@free":
300
                new_par[key_[1:].lower()] = par[key_]
301
            else:
302
                if par["@name"].lower() not in ["scale", "eb"]:
303
                    new_par["frozen"] = (par[key_] == "0") and not is_target
304
                else:
305
                    # Never change frozen status of Reference Energy
306
                    new_par["frozen"] = par[key_] == "0"
307
            new_par["unit"] = ""
308
            new_par["is_norm"] = False
309
310
            # Using the nomenclature as used in Gammapy
311
            new_par = params_renaming_to_gammapy(
312
                par["@name"].lower(), new_par, spectrum_type, params_1_base_model=base_model_type
313
            )
314
315
        new_par = params_rescale_to_gammapy(
316
            new_par, spectrum_type, en_scale_1_to_gpy=en_scale_1_to_gpy, keep_sign=keep_sign
317
        )
318
319
        if base_model_type == "Fermi-XML":
320
            if new_par["name"] == "alpha" and spectrum_type in [
321
                "PLSuperExpCutoff",
322
                "PLSuperExpCutoff2",
323
            ]:
324
                new_par["frozen"] = par["@free"] == "0"
325
326
        new_params.append(param_dict_to_gammapy_parameter(new_par))
327
328
    params_final2 = Parameters(new_params)
329
330
    # Modifications when different parameters are interconnected
331
    if base_model_type == "Fermi-XML":
332
        if spectrum_type == "PLSuperExpCutoff2":
333
            alpha_inv = 1 / params_final2["alpha"].value
334
            min_sign = 1
335
            if params_final2["lambda_"].min < 0:
336
                min_sign = -1
337
338
            params_final2["lambda_"].value = params_final2["lambda_"].value ** alpha_inv / en_scale_1_to_gpy
339
            params_final2["lambda_"].min = min_sign * (
340
                abs(params_final2["lambda_"].min) ** alpha_inv / en_scale_1_to_gpy
341
            )
342
            params_final2["lambda_"].max = params_final2["lambda_"].max ** alpha_inv / en_scale_1_to_gpy
343
344
    return params_final2
345
346
347
def fetch_spatial_galactic_frame(spatial_params, sigma=False):
348
    """
349
    Function to get the galactic frame from a spatial model written in
350
    Fermi-XML format.
351
352
    If the value of spatial extension, sigma is required, it will also be fetched.
353
    """
354
    if not sigma:
355
        sigma = None
356
357
    for par_ in spatial_params:
358
        match par_["@name"]:
359
            case "RA":
360
                lon_0 = f"{par_['@value']} deg"
361
            case "DEC":
362
                lat_0 = f"{par_['@value']} deg"
363
            case "Sigma":
364
                sigma = f"{par_['@value']} deg"
365
    fk5_frame = SkyCoord(
366
        lon_0,
367
        lat_0,
368
        frame="fk5",
369
    )
370
371
    return fk5_frame.transform_to("galactic"), sigma
372
373
374
def xml_spatial_model_to_gammapy(aux_path, xml_spatial_model, base_model_type="Fermi-XML"):
375
    """
376
    Read the spatial model component of the XMl model to Gammapy SpatialModel
377
    object.
378
379
    Details of the Fermi-XML model can be seen at
380
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/source_models.html
381
    and with examples at
382
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/xml_model_defs.html
383
384
    Parameters
385
    ----------
386
    aux_path: `Path`
387
        Path to the template diffuse models
388
    xml_spatial_model: `dict`
389
        Spatial Model component of a particular source from the XML file
390
    base_model_type: str
391
        Name indicating the XML format used to read the skymodels from.
392
393
    Returns
394
    -------
395
    spatial_model: `gammapy.modeling.models.SpatialModel`
396
        Gammapy Spatial Model object
397
    """
398
    spatial_pars = xml_spatial_model["parameter"]
399
400
    if base_model_type == "Fermi-XML":
401
        match xml_spatial_model["@type"]:
402
            case "SkyDirFunction":
403
                gal_frame, _ = fetch_spatial_galactic_frame(spatial_pars, sigma=False)
404
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("PointSpatialModel")().from_position(gal_frame)
405
406
            case "SpatialMap":
407
                file_name = xml_spatial_model["@file"].split("/")[-1]
408
                file_path = aux_path / f"Templates/{file_name}"
409
410
                spatial_map = Map.read(file_path)
411
                spatial_map = spatial_map.copy(unit="sr^-1")
412
413
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("TemplateSpatialModel")(
414
                    spatial_map, filename=file_path
415
                )
416
417
            case "RadialGaussian":
418
                gal_frame, sigma = fetch_spatial_galactic_frame(spatial_pars, sigma=True)
419
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("GaussianSpatialModel")(
420
                    lon_0=gal_frame.l, lat_0=gal_frame.b, sigma=sigma, frame="galactic"
421
                )
422
423
    return spatial_model
424