Passed
Push — master ( cba5ed...7e35c2 )
by Simon
04:17
created

TrafoClass._convert_results2hyper()   A

Complexity

Conditions 2

Size

Total Lines 24
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 17
nop 1
dl 0
loc 24
rs 9.55
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
import copy
6
import inspect
7
import numpy as np
8
import pandas as pd
9
10
11
from .objective_function import ObjectiveFunction
12
from .hyper_gradient_trafo import HyperGradientTrafo
13
14
15
class TrafoClass:
16
    def __init__(self, *args, **kwargs):
17
        pass
18
19
    def _convert_args2gfo(self, memory_warm_start):
20
        memory_warm_start = self.trafo.trafo_memory_warm_start(memory_warm_start)
21
22
        return memory_warm_start
23
24
    def _positions2results(self, positions):
25
        results_dict = {}
26
27
        for para_name in self.conv.para_names:
28
            values_list = self.search_space[para_name]
29
            pos_ = positions[para_name].values
30
            values_ = [values_list[idx] for idx in pos_]
31
            results_dict[para_name] = values_
32
33
        results = pd.DataFrame.from_dict(results_dict)
34
35
        diff_list = np.setdiff1d(positions.columns, results.columns)
36
        results[diff_list] = positions[diff_list]
37
38
        return results
39
40
    def _convert_results2hyper(self):
41
        self.eval_time = np.array(self._optimizer.eval_times).sum()
42
        self.iter_time = np.array(self._optimizer.iter_times).sum()
43
44
        if self._optimizer.best_para is not None:
45
            value = self.trafo.para2value(self._optimizer.best_para)
46
            position = self.trafo.position2value(value)
47
            best_para = self.trafo.value2para(position)
48
49
            self.best_para = best_para
50
        else:
51
            self.best_para = None
52
53
        self.best_score = self._optimizer.best_score
54
        self.positions = self._optimizer.results
55
56
        self.results = self._positions2results(self.positions)
57
58
        results_dd = self._optimizer.results.drop_duplicates(
59
            subset=self.trafo.para_names, keep="first"
60
        )
61
        self.memory_values_df = results_dd[
62
            self.trafo.para_names + ["score"]
63
        ].reset_index(drop=True)
64
65
66
class _BaseOptimizer_(TrafoClass):
67
    def __init__(self, **opt_params):
68
        super().__init__()
69
        self.opt_params = opt_params
70
71
    def init(self, search_space, initialize, data_c):
72
        self.search_space = search_space
73
        self.initialize = initialize
74
        self.data_c = data_c
75
76
        self.trafo = HyperGradientTrafo(search_space)
77
78
        initialize = self.trafo.trafo_initialize(initialize)
79
        search_space_positions = self.trafo.search_space_positions
80
81
        # trafo warm start for smbo from values into positions
82
        if "warm_start_smbo" in self.opt_params:
83
            self.opt_params["warm_start_smbo"] = self.trafo.trafo_memory_warm_start(
84
                self.opt_params["warm_start_smbo"]
85
            )
86
87
        self._optimizer = self._OptimizerClass(
88
            search_space_positions, initialize, **self.opt_params
89
        )
90
91
        self.conv = self._optimizer.conv
92
93
    def check_LTM(self, memory):
94
        try:
95
            memory.study_id
96
            memory.model_id
97
        except:
98
            self.memory = memory
99
        else:
100
            self.init_ltm(memory)
101
102
    def init_ltm(self, memory):
103
        self.ltm = copy.deepcopy(memory)
104
        self.ltm.init_study(
105
            self.objective_function, self.search_space, self.nth_process
106
        )
107
        self.memory_warm_start = self.ltm.load()
108
        self.memory = True
109
110
        print("\n self.memory_warm_start \n", self.memory_warm_start)
111
112
    def search(
113
        self,
114
        objective_function,
115
        n_iter,
116
        max_time=None,
117
        max_score=None,
118
        memory=True,
119
        memory_warm_start=None,
120
        verbosity={
121
            "progress_bar": True,
122
            "print_results": True,
123
            "print_times": True,
124
        },
125
        random_state=None,
126
        nth_process=None,
127
    ):
128
        self.objective_function = objective_function
129
        self.nth_process = nth_process
130
131
        gfo_wrapper_model = ObjectiveFunction(
132
            objective_function, self._optimizer, nth_process
133
        )
134
135
        # ltm init
136
        self.check_LTM(memory)
137
        memory_warm_start = self._convert_args2gfo(memory_warm_start)
138
139
        self._optimizer.search(
140
            gfo_wrapper_model(self.search_space, self.data_c),
141
            n_iter,
142
            max_time,
143
            max_score,
144
            memory,
145
            memory_warm_start,
146
            verbosity,
147
            random_state,
148
            nth_process,
149
        )
150
151
        self._convert_results2hyper()
152
        self.p_bar = self._optimizer.p_bar
153
154
        # ltm save after finish
155
        """
156
        if inspect.isclass(type(memory)):
157
            self.ltm.save_on_finish(self.results)
158
        """
159