Passed
Pull Request — main (#178)
by Chaitanya
05:19
created

check_preferred_model.write_output_config_yaml()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 1
dl 0
loc 22
rs 9.7
c 0
b 0
f 0
1
import argparse
2
import logging
3
from pathlib import Path
4
5
import numpy as np
6
7
from asgardpy.analysis import AsgardpyAnalysis
8
from asgardpy.config import AsgardpyConfig
9
from asgardpy.stats import (
10
    check_model_preference_aic,
11
    copy_target_config,
12
    fetch_all_analysis_fit_info,
13
    get_model_config_files,
14
    tabulate_best_fit_stats,
15
    write_output_config_yaml,
16
)
17
18
log = logging.getLogger(__name__)
19
20
parser = argparse.ArgumentParser(description="Get preferred best-fit spectral model")
21
22
parser.add_argument(
23
    "--config",
24
    "-c",
25
    help="Path to the config file",
26
)
27
28
parser.add_argument("--ebl-scale-factor", "-e", help="Value of EBL Norm Scale Factor", default=1.0, type=float)
29
30
parser.add_argument(
31
    "--ebl-model-name",
32
    "-m",
33
    help="Name of EBL model as used by Gammapy",
34
    default="dominguez",
35
    type=str,
36
)
37
38
parser.add_argument(
39
    "--write-config",
40
    help="Boolean to write the best-fit model into a separate file.",
41
    default=True,
42
    type=bool,
43
)
44
45
46
def fetch_all_analysis_objects(main_config, spec_model_temp_files, ebl_scale_factor, ebl_model_name):
47
    """For a list of spectral models, initiate AsgardpyAnalysis objects."""
48
    main_analysis_list = {}
49
    spec_models_list = []
50
51
    for temp in spec_model_temp_files:
52
        temp_model = AsgardpyAnalysis(main_config)
53
        temp_model.config.target.models_file = temp
54
55
        temp_model_2 = AsgardpyAnalysis(temp_model.config)
56
57
        copy_target_config(temp_model, temp_model_2)
58
59
        if ebl_scale_factor != 1.0:
60
            temp_model_2.config.target.components[0].spectral.ebl_abs.alpha_norm = ebl_scale_factor
61
62
        if ebl_model_name != "dominguez":
63
            temp_model_2.config.target.components[0].spectral.ebl_abs.reference = ebl_model_name.replace("_", "-")
64
        else:
65
            temp_model_2.config.target.components[
66
                0
67
            ].spectral.ebl_abs.reference = temp_model.config.target.components[0].spectral.ebl_abs.reference
68
69
        spec_tag = temp.name.split(".")[0].split("_")[-1]
70
        spec_models_list.append(spec_tag)
71
        main_analysis_list[spec_tag] = {}
72
73
        main_analysis_list[spec_tag]["Analysis"] = temp_model_2
74
75
    spec_models_list = np.array(spec_models_list)
76
77
    return main_analysis_list, spec_models_list
78
79
80
def main():
81
    args = parser.parse_args()
82
83
    main_config = AsgardpyConfig.read(args.config)
84
    config_path = Path(args.config)
85
    config_path_file_name = config_path.name.split(".")[0]
86
    target_source_name = main_config.target.source_name
87
88
    steps_list = []
89
    for s in main_config.general.steps:
90
        if s != "flux-points":
91
            steps_list.append(s)
92
    log.info("Target source is: %s", target_source_name)
93
94
    spec_model_temp_files = get_model_config_files(["lp", "bpl", "ecpl", "pl", "eclp", "sbpl"])
95
96
    main_analysis_list, spec_models_list = fetch_all_analysis_objects(
97
        main_config, spec_model_temp_files, args.ebl_scale_factor, args.ebl_model_name
98
    )
99
100
    # Run Analysis Steps till Fit
101
    PL_idx = 0
102
103
    for i, tag in enumerate(spec_models_list):
104
        log.info("Spectral model being tested: %s", tag)
105
106
        main_analysis_list[tag]["Analysis"].run(steps_list)
107
108
        if tag == "pl":
109
            PL_idx = i
110
111
    fit_success_list, stat_list, dof_list, pref_over_pl_chi2_list = fetch_all_analysis_fit_info(
112
        main_analysis_list, spec_models_list
113
    )
114
115
    # If any spectral model has at least 5 sigmas preference over PL
116
    best_sp_idx_lrt = np.nonzero(pref_over_pl_chi2_list == np.nanmax(pref_over_pl_chi2_list))[0]
117
    for idx in best_sp_idx_lrt:
118
        if pref_over_pl_chi2_list[idx] > 5:
119
            sp_idx_lrt = idx
120
            log.info("Best preferred spectral model over PL is %s", spec_models_list[idx])
121
        else:
122
            sp_idx_lrt = PL_idx
123
            log.info("No other model preferred over PL")
124
125
    list_rel_p = check_model_preference_aic(stat_list, dof_list)
126
127
    best_sp_idx_aic = np.nonzero(list_rel_p == np.nanmax(list_rel_p))[0]
128
129
    for idx in best_sp_idx_aic:
130
        if list_rel_p[idx] > 0.95:
131
            sp_idx_aic = idx
132
            log.info("Best preferred spectral model is %s", spec_models_list[fit_success_list][idx])
133
        else:
134
            sp_idx_aic = PL_idx
135
            log.info("No other model preferred, hence PL is selected")
136
137
    stats_table = tabulate_best_fit_stats(spec_models_list, fit_success_list, main_analysis_list, list_rel_p)
138
139
    stats_table.meta["Target source name"] = target_source_name
140
    stats_table.meta["EBL model"] = args.ebl_model_name
141
    stats_table.meta["EBL scale factor"] = args.ebl_scale_factor
142
143
    file_name = f"{config_path_file_name}_{args.ebl_model_name}_{args.ebl_scale_factor}_fit_stats.ecsv"
144
    stats_table.write(
145
        main_config.general.outdir / file_name,
146
        format="ascii.ecsv",
147
        overwrite=True,
148
    )
149
150
    if args.write_config:
151
        log.info("Write the spectral model")
152
153
        for idx, name in zip([sp_idx_lrt, sp_idx_aic], ["lrt", "aic"], strict=False):
0 ignored issues
show
introduced by
The variable sp_idx_lrt does not seem to be defined in case the for loop on line 117 is not entered. Are you sure this can never be the case?
Loading history...
introduced by
The variable sp_idx_aic does not seem to be defined in case the for loop on line 129 is not entered. Are you sure this can never be the case?
Loading history...
154
            tag = spec_models_list[fit_success_list][idx]
155
156
            path = config_path.parent / f"{config_path_file_name}_model_most_pref_{name}.yaml"
157
158
            yaml_ = write_output_config_yaml(main_analysis_list[tag]["Analysis"].final_model[0])
159
            path.write_text(yaml_)
160
161
162
if __name__ == "__main__":
163
    main()
164