Passed
Pull Request — main (#157)
by Chaitanya
01:53
created

asgardpy.gammapy.interoperate_models   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 359
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 186
dl 0
loc 359
rs 3.6
c 0
b 0
f 0
wmc 60

How to fix   Complexity   

Complexity

Complex classes like asgardpy.gammapy.interoperate_models often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Functions for having interoperatibility of Models type objects with Gammapy
3
objects.
4
"""
5
from astropy.coordinates import SkyCoord
6
from gammapy.maps import Map
7
from gammapy.modeling import Parameter, Parameters
8
from gammapy.modeling.models import SPATIAL_MODEL_REGISTRY, SPECTRAL_MODEL_REGISTRY
9
10
__all__ = [
11
    "get_gammapy_spectral_model",
12
    "params_renaming_to_gammapy",
13
    "params_rescale_to_gammapy",
14
    "xml_spatial_model_to_gammapy",
15
    "xml_spectral_model_to_gammapy",
16
]
17
18
19
def get_gammapy_spectral_model(spectrum_type, ebl_atten=False, base_model_type="Fermi-XML"):
20
    """
21
    Getting the correct Gammapy SpectralModel object after reading a name from a
22
    different base_model_type.
23
24
    Parameter
25
    ---------
26
    spectrum_type: str
27
        Spectral Model type as written in the base model.
28
    ebl_atten: bool
29
        If EBL attenuated spectral model needs different treatment.
30
    base_model_type: str
31
        Name indicating the XML format used to read the skymodels from.
32
33
    Return
34
    ------
35
    spectral_model: `gammapy.modleing.models.SpectralModel`
36
        Gammapy SpectralModel object corresponding to the provided name.
37
    ebl_atten: bool
38
        Boolean for EBL attenuated spectral model.
39
    """
40
    if base_model_type == "Fermi-XML":
41
        spectrum_type_no_ebl = spectrum_type.split("EblAtten::")[-1]
42
43
        match spectrum_type_no_ebl:
44
            case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
45
                spectrum_type_final = "ExpCutoffPowerLawSpectralModel"
46
            case "PLSuperExpCutoff4":
47
                spectrum_type_final = "SuperExpCutoffPowerLaw4FGLDR3SpectralModel"
48
            case _:
49
                spectrum_type_final = f"{spectrum_type_no_ebl}SpectralModel"
50
51
        spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls(spectrum_type_final)()
52
53
        if spectrum_type_no_ebl == "LogParabola":
54
            if "EblAtten" in spectrum_type:
55
                spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls("PowerLawSpectralModel")()
56
                ebl_atten = True
57
            else:
58
                spectral_model = SPECTRAL_MODEL_REGISTRY.get_cls("LogParabolaSpectralModel")()
59
60
    return spectral_model, ebl_atten
61
62
63
def rename_fermi_energy_params(param_name):
64
    """ """
65
    match param_name:
66
        case "scale" | "eb":
67
            return "reference"
68
69
        case "breakvalue":
70
            return "ebreak"
71
72
        case "lowerlimit":
73
            return "emin"
74
75
        case "upperlimit":
76
            return "emax"
77
78
        case _:
79
            return "NA"
80
81
82
def rename_fermi_index_params(param_name, spectrum_type):
83
    """ """
84
    match param_name:
85
        case "index":
86
            return "index"
87
88
        case "index1":
89
            match spectrum_type:
90
                case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
91
                    return "index"
92
                case _:
93
                    return "index1"  # For spectrum_type == "BrokenPowerLaw"
94
95
        case "indexs":
96
            return "index_1"  # For spectrum_type == "PLSuperExpCutoff4"
97
98
        case "index2":
99
            match spectrum_type:
100
                case "PLSuperExpCutoff4":
101
                    return "index_2"
102
                case "PLSuperExpCutoff" | "PLSuperExpCutoff2":
103
                    return "alpha"
104
                case _:
105
                    return "index2"  # For spectrum_type == "BrokenPowerLaw"
106
107
108
def params_renaming_to_gammapy(params_1_name, params_gpy, spectrum_type, params_1_base_model="Fermi-XML"):
109
    """
110
    Reading from a given parameter name, get basic parameters details like name,
111
    unit and is_norm as per Gammapy definition.
112
113
    This function collects all such switch cases, based on the base model of the
114
    given set of parameters.
115
    """
116
    if params_1_base_model == "Fermi-XML":
117
        match params_1_name:
118
            case "norm" | "prefactor" | "integral":
119
                params_gpy["name"] = "amplitude"
120
                params_gpy["unit"] = "cm-2 s-1 TeV-1"
121
                params_gpy["is_norm"] = True
122
123
            case "scale" | "eb" | "breakvalue" | "lowerlimit" | "upperlimit":
124
                params_gpy["name"] = rename_fermi_energy_params(params_1_name)
125
126
            case "index" | "index1" | "indexs" | "index2":
127
                params_gpy["name"] = rename_fermi_index_params(params_1_name, spectrum_type)
128
129
            case "cutoff" | "expfactor":
130
                params_gpy["name"] = "lambda_"
131
                params_gpy["unit"] = "TeV-1"
132
133
            case "expfactors":
134
                params_gpy["name"] = "expfactor"
135
136
    return params_gpy
137
138
139
def rescale_parameters(params_dict, scale_factor):
140
    """ """
141
    params_dict["value"] = float(params_dict["value"]) * float(params_dict["scale"]) * scale_factor
142
143
    if "error" in params_dict:
144
        params_dict["error"] = float(params_dict["error"]) * float(params_dict["scale"]) * scale_factor
145
146
    if scale_factor == -1:
147
        # Reverse the limits while changing the sign
148
        min_ = float(params_dict["min"]) * float(params_dict["scale"]) * scale_factor
149
        max_ = float(params_dict["max"]) * float(params_dict["scale"]) * scale_factor
150
151
        params_dict["min"] = min(min_, max_)
152
        params_dict["max"] = max(min_, max_)
153
    else:
154
        params_dict["min"] = float(params_dict["min"]) * float(params_dict["scale"]) * scale_factor
155
        params_dict["max"] = float(params_dict["max"]) * float(params_dict["scale"]) * scale_factor
156
157
    params_dict["scale"] = 1.0
158
159
    return params_dict
160
161
162
def invert_parameters(params_dict, scale_factor):
163
    """ """
164
    val_ = float(params_dict["value"]) * float(params_dict["scale"])
165
166
    params_dict["value"] = scale_factor / val_
167
168
    if "error" in params_dict:
169
        params_dict["error"] = scale_factor * float(params_dict["error"]) / (val_**2)
170
171
    min_ = scale_factor / (float(params_dict["min"]) * float(params_dict["scale"]))
172
    max_ = scale_factor / (float(params_dict["max"]) * float(params_dict["scale"]))
173
174
    params_dict["min"] = min(min_, max_)
175
    params_dict["max"] = max(min_, max_)
176
177
    params_dict["scale"] = 1.0
178
179
    return params_dict
180
181
182
def params_rescale_to_gammapy(params_gpy, spectrum_type, en_scale_1_to_gpy=1.0e-6, keep_sign=False):
183
    """
184
    Rescaling parameters to be used with Gammapy standard units, by using the
185
    various physical factors (energy for now), compared with the initial set of
186
    parameters as compared with Gammapy standard units.
187
188
    Also, scales the value, min and max of the given parameters, depending on
189
    their Parameter names.
190
    """
191
    match params_gpy["name"]:
192
        case "reference" | "ebreak" | "emin" | "emax":
193
            params_gpy["unit"] = "TeV"
194
            params_gpy = rescale_parameters(params_gpy, en_scale_1_to_gpy)
195
196
        case "amplitude":
197
            params_gpy = rescale_parameters(params_gpy, 1 / en_scale_1_to_gpy)
198
199
        case "index" | "index_1" | "index_2" | "beta":
200
            if not keep_sign:
201
                # Other than EBL Attenuated Power Law?
202
                # spectral indices in gammapy are always taken as positive values.
203
                val_ = float(params_gpy["value"]) * float(params_gpy["scale"])
204
                if val_ < 0:
205
                    params_gpy = rescale_parameters(params_gpy, -1)
206
207
        case "lambda_":
208
            if spectrum_type == "PLSuperExpCutoff":
209
                # Original parameter is inverse of what gammapy uses
210
                params_gpy = invert_parameters(params_gpy, en_scale_1_to_gpy)
211
212
    if float(params_gpy["scale"]) != 1.0:
213
        params_gpy = rescale_parameters(params_gpy, 1)
214
215
    return params_gpy
216
217
218
def param_dict_to_gammapy_parameter(new_par):
219
    """ """
220
    # Read into Gammapy Parameter object
221
222
    new_param = Parameter(name=new_par["name"], value=new_par["value"])
223
224
    if "error" in new_par:
225
        new_param.error = new_par["error"]
226
227
    new_param.min = new_par["min"]
228
    new_param.max = new_par["max"]
229
    new_param.unit = new_par["unit"]
230
    new_param.frozen = new_par["frozen"]
231
    new_param._is_norm = new_par["is_norm"]
232
233
    return new_param
234
235
236
def xml_spectral_model_to_gammapy(
237
    params, spectrum_type, is_target=False, keep_sign=False, base_model_type="Fermi-XML"
238
):
239
    """
240
    Convert the Spectral Models information from XML model of FermiTools to Gammapy
241
    standards and return Parameters list.
242
    Details of the XML model can be seen at
243
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/source_models.html
244
    and with examples at
245
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/xml_model_defs.html
246
247
    Models from the XML model, not read are -
248
    ExpCutoff (Blazar modeling with EBL absorption included),
249
    BPLExpCutoff,
250
    PLSuperExpCutoff3 (Pulsar modeling),
251
    BandFunction (GRB analysis)
252
253
    Parameters
254
    ----------
255
    params: `gammapy.modeling.Parameters`
256
        List of gammapy Parameter object of a particular Model
257
    spectrum_type: str
258
        Spectrum type as defined in XML. To be used only for special cases like
259
        PLSuperExpCutoff, PLSuperExpCutoff2 and PLSuperExpCutoff4
260
    is_target: bool
261
        Boolean to check if the given list of Parameters belong to the target
262
        source model or not.
263
    keep_sign: bool
264
        Boolean to keep the same sign on the parameter values or not.
265
    base_model_type: str
266
        Name indicating the XML format used to read the skymodels from.
267
268
    Returns
269
    -------
270
    params_final: `gammapy.modeling.Parameters`
271
        Final list of gammapy Parameter object
272
    """
273
    new_params = []
274
275
    # Some modifications on scaling/sign
276
    # By default for base_model_type == "Fermi-XML"
277
    en_scale_1_to_gpy = 1.0e-6
278
279
    for par in params:
280
        new_par = {}
281
282
        for key_ in par.keys():
283
            # Getting the "@par_name" for each parameter without the "@"
284
            if key_ != "@free":
285
                new_par[key_[1:].lower()] = par[key_]
286
            else:
287
                if par["@name"].lower() not in ["scale", "eb"]:
288
                    new_par["frozen"] = (par[key_] == "0") and not is_target
289
                else:
290
                    # Never change frozen status of Reference Energy
291
                    new_par["frozen"] = par[key_] == "0"
292
            new_par["unit"] = ""
293
            new_par["is_norm"] = False
294
295
            # Using the nomenclature as used in Gammapy
296
            new_par = params_renaming_to_gammapy(
297
                par["@name"].lower(), new_par, spectrum_type, params_1_base_model=base_model_type
298
            )
299
300
        new_par = params_rescale_to_gammapy(
301
            new_par, spectrum_type, en_scale_1_to_gpy=en_scale_1_to_gpy, keep_sign=keep_sign
302
        )
303
304
        if base_model_type == "Fermi-XML":
305
            if new_par["name"] == "alpha" and spectrum_type in [
306
                "PLSuperExpCutoff",
307
                "PLSuperExpCutoff2",
308
            ]:
309
                new_par["frozen"] = par["@free"] == "0"
310
311
        new_params.append(param_dict_to_gammapy_parameter(new_par))
312
313
    params_final2 = Parameters(new_params)
314
315
    # Modifications when different parameters are interconnected
316
    if base_model_type == "Fermi-XML":
317
        if spectrum_type == "PLSuperExpCutoff2":
318
            alpha_inv = 1 / params_final2["alpha"].value
319
            min_sign = 1
320
            if params_final2["lambda_"].min < 0:
321
                min_sign = -1
322
323
            params_final2["lambda_"].value = params_final2["lambda_"].value ** alpha_inv / en_scale_1_to_gpy
324
            params_final2["lambda_"].min = min_sign * (
325
                abs(params_final2["lambda_"].min) ** alpha_inv / en_scale_1_to_gpy
326
            )
327
            params_final2["lambda_"].max = params_final2["lambda_"].max ** alpha_inv / en_scale_1_to_gpy
328
329
    return params_final2
330
331
332
def fetch_spatial_galactic_frame(spatial_params, sigma=False):
333
    """ """
334
    if not sigma:
335
        sigma = None
336
337
    for par_ in spatial_params:
338
        match par_["@name"]:
339
            case "RA":
340
                lon_0 = f"{par_['@value']} deg"
341
            case "DEC":
342
                lat_0 = f"{par_['@value']} deg"
343
            case "Sigma":
344
                sigma = f"{par_['@value']} deg"
345
    fk5_frame = SkyCoord(
346
        lon_0,
347
        lat_0,
348
        frame="fk5",
349
    )
350
351
    return fk5_frame.transform_to("galactic"), sigma
352
353
354
def xml_spatial_model_to_gammapy(aux_path, xml_spatial_model, base_model_type="Fermi-XML"):
355
    """
356
    Read the spatial model component of the XMl model to Gammapy SpatialModel
357
    object.
358
359
    Details of the Fermi-XML model can be seen at
360
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/source_models.html
361
    and with examples at
362
    https://fermi.gsfc.nasa.gov/ssc/data/analysis/scitools/xml_model_defs.html
363
364
    Parameters
365
    ----------
366
    aux_path: `Path`
367
        Path to the template diffuse models
368
    xml_spatial_model: `dict`
369
        Spatial Model component of a particular source from the XML file
370
    base_model_type: str
371
        Name indicating the XML format used to read the skymodels from.
372
373
    Returns
374
    -------
375
    spatial_model: `gammapy.modeling.models.SpatialModel`
376
        Gammapy Spatial Model object
377
    """
378
    spatial_pars = xml_spatial_model["parameter"]
379
380
    if base_model_type == "Fermi-XML":
381
        match xml_spatial_model["@type"]:
382
            case "SkyDirFunction":
383
                gal_frame, _ = fetch_spatial_galactic_frame(spatial_pars, sigma=False)
384
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("PointSpatialModel")().from_position(gal_frame)
385
386
            case "SpatialMap":
387
                file_name = xml_spatial_model["@file"].split("/")[-1]
388
                file_path = aux_path / f"Templates/{file_name}"
389
390
                spatial_map = Map.read(file_path)
391
                spatial_map = spatial_map.copy(unit="sr^-1")
392
393
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("TemplateSpatialModel")(
394
                    spatial_map, filename=file_path
395
                )
396
397
            case "RadialGaussian":
398
                gal_frame, sigma = fetch_spatial_galactic_frame(spatial_pars, sigma=True)
399
                spatial_model = SPATIAL_MODEL_REGISTRY.get_cls("GaussianSpatialModel")(
400
                    lon_0=gal_frame.l, lat_0=gal_frame.b, sigma=sigma, frame="galactic"
401
                )
402
403
    return spatial_model
404