Completed
Push — main ( 4744cf...d735dd )
by Chaitanya
32s queued 23s
created

asgardpy.config.operations   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 155
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 71
dl 0
loc 155
rs 10
c 0
b 0
f 0
wmc 21

6 Functions

Rating   Name   Duplication   Size   Complexity  
A compound_model_dict_converstion() 0 19 1
C recursive_merge_dicts() 0 52 9
A all_model_templates() 0 12 2
A check_gammapy_model() 0 13 3
A get_model_template() 0 12 3
A deep_update() 0 12 3
1
"""
2
Main AsgardpyConfig Operations Module
3
"""
4
5
import logging
6
from collections.abc import Mapping
7
from pathlib import Path
8
9
import numpy as np
10
from gammapy.modeling.models import Models, SkyModel
11
12
__all__ = [
13
    "all_model_templates",
14
    "compound_model_dict_converstion",
15
    "get_model_template",
16
    "recursive_merge_dicts",
17
    "deep_update",
18
]
19
20
CONFIG_PATH = Path(__file__).resolve().parent
21
22
log = logging.getLogger(__name__)
23
24
25
def all_model_templates():
26
    """
27
    Collect all Template Models provided in Asgardpy, and their small tag names.
28
    """
29
    template_files = sorted(list(CONFIG_PATH.glob("model_templates/model_template*yaml")))
30
31
    all_tags = []
32
    for file in template_files:
33
        all_tags.append(file.name.split("_")[-1].split(".")[0])
34
    all_tags = np.array(all_tags)
35
36
    return all_tags, template_files
37
38
39
def get_model_template(spec_model_tag):
40
    """
41
    Read a particular template model yaml filename to create an AsgardpyConfig
42
    object.
43
    """
44
    all_tags, template_files = all_model_templates()
45
    new_model_file = None
46
47
    for file, tag in zip(template_files, all_tags, strict=True):
48
        if spec_model_tag == tag:
49
            new_model_file = file
50
    return new_model_file
51
52
53
def check_gammapy_model(gammapy_model):
54
    """
55
    For a given object type, try to read it as a Gammapy Models object.
56
    """
57
    if isinstance(gammapy_model, Models | SkyModel):
58
        models_gpy = Models(gammapy_model)
59
    else:
60
        try:
61
            models_gpy = Models.read(gammapy_model)
62
        except KeyError:
63
            raise TypeError("%s File cannot be read by Gammapy Models", gammapy_model) from KeyError
64
65
    return models_gpy
66
67
68
def recursive_merge_dicts(base_config, extra_config):
69
    """
70
    recursively merge two dictionaries.
71
    Entries in extra_config override entries in base_config. The built-in
72
    update function cannot be used for hierarchical dicts.
73
74
    Also for the case when there is a list of dicts involved, one has to be
75
    more careful. The extra_config may have longer list of dicts as compared
76
    with the base_config, in which case, the extra items are simply added to
77
    the merged final list.
78
79
    Combined here are 2 options from SO.
80
81
    See:
82
    http://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth/3233356#3233356
83
    and also
84
    https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth/18394648#18394648
85
86
    Parameters
87
    ----------
88
    base_config : dict
89
        dictionary to be merged
90
    extra_config : dict
91
        dictionary to be merged
92
    Returns
93
    -------
94
    final_config : dict
95
        merged dict
96
    """
97
    final_config = base_config.copy()
98
99
    for key, value in extra_config.items():
100
        if key in final_config and isinstance(final_config[key], list):
101
            new_config = []
102
103
            for key_, value_ in zip(final_config[key], value, strict=False):
104
                key_ = recursive_merge_dicts(key_ or {}, value_)
105
                new_config.append(key_)
106
107
            # For example moving from a smaller list of model parameters to a
108
            # longer list.
109
            if len(final_config[key]) < len(extra_config[key]):
110
                for value_ in value[len(final_config[key]) :]:
111
                    new_config.append(value_)
112
            final_config[key] = new_config
113
114
        elif key in final_config and isinstance(final_config[key], dict):
115
            final_config[key] = recursive_merge_dicts(final_config.get(key) or {}, value)
116
        else:
117
            final_config[key] = value
118
119
    return final_config
120
121
122
def deep_update(d, u):
123
    """
124
    Recursively update a nested dictionary.
125
126
    Just like in Gammapy, taken from: https://stackoverflow.com/a/3233356/19802442
127
    """
128
    for k, v in u.items():
129
        if isinstance(v, Mapping):
130
            d[k] = deep_update(d.get(k, {}), v)
131
        else:
132
            d[k] = v
133
    return d
134
135
136
def compound_model_dict_converstion(dict):
137
    """
138
    Given a Gammapy CompoundSpectralModel as a dict object, convert it into
139
    an Asgardpy form.
140
    """
141
    ebl_abs = dict["model2"]
142
    ebl_abs["alpha_norm"] = ebl_abs["parameters"][0]["value"]
143
    ebl_abs["redshift"] = ebl_abs["parameters"][1]["value"]
144
    ebl_abs.pop("parameters", None)
145
146
    dict["type"] = dict["model1"]["type"]
147
    dict["parameters"] = dict["model1"]["parameters"]
148
    dict["ebl_abs"] = ebl_abs
149
150
    dict.pop("model1", None)
151
    dict.pop("model2", None)
152
    dict.pop("operator", None)
153
154
    return dict
155