Passed
Push — master ( 7b482c...5d2e95 )
by Simon
08:24
created

SearchProcess._memory_processor()   B

Complexity

Conditions 6

Size

Total Lines 29
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 22
nop 1
dl 0
loc 29
rs 8.4186
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
import os
6
import time
7
import numpy as np
8
9
from .search_space import SearchSpace
10
from .model import Model
11
from .init_position import InitSearchPosition
12
13
from hypermemory import Hypermemory
14
from importlib import import_module
15
16
17
def meta_data_path():
18
    current_path = os.path.realpath(__file__)
19
    return current_path.rsplit("/", 1)[0] + "/meta_data/"
20
21
22
optimizer_dict = {
23
    "HillClimbing": "HillClimbingOptimizer",
24
    "StochasticHillClimbing": "StochasticHillClimbingOptimizer",
25
    "TabuSearch": "TabuOptimizer",
26
    "RandomSearch": "RandomSearchOptimizer",
27
    "RandomRestartHillClimbing": "RandomRestartHillClimbingOptimizer",
28
    "RandomAnnealing": "RandomAnnealingOptimizer",
29
    "SimulatedAnnealing": "SimulatedAnnealingOptimizer",
30
    "StochasticTunneling": "StochasticTunnelingOptimizer",
31
    "ParallelTempering": "ParallelTemperingOptimizer",
32
    "ParticleSwarm": "ParticleSwarmOptimizer",
33
    "EvolutionStrategy": "EvolutionStrategyOptimizer",
34
    "Bayesian": "BayesianOptimizer",
35
    "TPE": "TreeStructuredParzenEstimators",
36
    "DecisionTree": "DecisionTreeOptimizer",
37
}
38
39
40
class ShortTermMemory:
41
    def __init__(self, _space_, _main_args_, _cand_):
42
        self._space_ = _space_
43
        self._main_args_ = _main_args_
44
45
        self.pos_best = None
46
        self.score_best = -np.inf
47
48
        self.memory_type = _main_args_.memory
49
        self.memory_dict = {}
50
        self.memory_dict_new = {}
51
52
        self.meta_data_found = False
53
54
        self.n_dims = None
55
56
57
class HypermemoryWrapper:
58
    def __init__(self):
59
        pass
60
61
    def load_memory(self, X, y):
62
        self.mem = Hypermemory(X, y, self.obj_func, self.search_space,)
63
        self.eval_pos = self.eval_pos_Mem
64
        self.memory_dict = self.mem.load()
65
66
67
class SearchProcess:
68
    def __init__(self, nth_process, pro_arg, verb):
69
        self.nth_process = nth_process
70
        self.pro_arg = pro_arg
71
        self.verb = verb
72
73
        kwargs = self.pro_arg.kwargs
74
        module = import_module("gradient_free_optimizers")
75
76
        self.opt_class = getattr(module, optimizer_dict[pro_arg.optimizer])
77
        self.obj_func = kwargs["objective_function"]
78
        self.func_para = kwargs["function_parameter"]
79
        self.search_space = kwargs["search_space"]
80
        self.n_iter = kwargs["n_iter"]
81
        self.n_jobs = kwargs["n_jobs"]
82
        self.memory = kwargs["memory"]
83
        self.init_para = kwargs["init_para"]
84
        self.distribution = kwargs["distribution"]
85
86
        self.space = SearchSpace(kwargs["search_space"], verb)
87
        self.model = Model(self.obj_func, self.func_para, verb)
88
        self.init = InitSearchPosition(self.init_para, self.space, verb)
89
90
        self.start_time = time.time()
91
        self.i = 0
92
93
        self.memory_dict = {}
94
        self.memory_dict_new = {}
95
96
        self._score = -np.inf
97
        self._pos = None
98
99
        self.score_best = -np.inf
100
        self.pos_best = None
101
102
        self.score_list = []
103
        self.pos_list = []
104
105
        self.eval_time = []
106
        self.iter_times = []
107
108
        print("self.memory", self.memory)
109
110
        self._memory_processor()
111
112
    def _memory_processor(self):
113
        if not self.memory:
114
            self.mem = None
115
            self.eval_pos = self.eval_pos_noMem
116
117
        elif self.memory == "short":
118
            self.mem = None
119
            self.eval_pos = self.eval_pos_Mem
120
121
        elif self.memory == "long":
122
            self.mem = Hypermemory(
123
                self.func_para["features"],
124
                self.func_para["target"],
125
                self.obj_func,
126
                self.search_space,
127
            )
128
            self.eval_pos = self.eval_pos_Mem
129
130
            self.memory_dict = self.mem.load()
131
132
        else:
133
            print("Warning: Memory not defined")
134
            self.mem = None
135
            self.eval_pos = self.eval_pos_noMem
136
137
        if self.mem:
138
            if self.mem.meta_data_found:
139
                self.pos_best = self.mem.pos_best
140
                self.score_best = self.mem.score_best
141
142
    def _get_warm_start(self):
143
        return self.space.pos2para(self.pos_best)
144
145
    def _process_results(self):
146
        self.total_time = time.time() - self.start_time
147
        start_point = self.verb.info.print_start_point(self)
148
149
        if self.memory == "long":
150
            self.mem.dump(self.memory_dict_new)
151
152
        return start_point
153
154
    @property
155
    def score(self):
156
        return self._score
157
158
    @score.setter
159
    def score(self, value):
160
        self.score_list.append(value)
161
        self._score = value
162
163
    @property
164
    def pos(self):
165
        return self._score
166
167
    @pos.setter
168
    def pos(self, value):
169
        self.pos_list.append(value)
170
        self._pos = value
171
172
    def base_eval(self, pos, nth_iter):
173
        para = self.space.pos2para(pos)
174
        para["iteration"] = self.i
175
        results = self.model.eval(para)
176
177
        if results["score"] > self.score_best:
178
            self.score_best = results["score"]
179
            self.pos_best = pos
180
181
            self.verb.p_bar.best_since_iter = nth_iter
182
183
        return results
184
185
    def eval_pos_noMem(self, pos, nth_iter):
186
        results = self.base_eval(pos, nth_iter)
187
        return results["score"]
188
189
    def eval_pos_Mem(self, pos, nth_iter, force_eval=False):
190
        pos.astype(int)
191
        pos_tuple = tuple(pos)
192
193
        if pos_tuple in self.memory_dict and not force_eval:
194
            return self.memory_dict[pos_tuple]["score"]
195
        else:
196
            results = self.base_eval(pos, nth_iter)
197
            self.memory_dict[pos_tuple] = results
198
            self.memory_dict_new[pos_tuple] = results
199
200
            return results["score"]
201
202
    def _get_score(self, pos_new, nth_iter):
203
        score_new = self.eval_pos(pos_new, nth_iter)
204
        self.verb.p_bar.update_p_bar(1, self.score_best)
205
206
        if score_new > self.score_best:
207
            self.score = score_new
208
            self.pos = pos_new
209
210
        return score_new
211
212
    def search(self, nth_process):
213
        self.pro_arg.set_random_seed(nth_process)
214
        self.verb.p_bar.init_p_bar(nth_process, self.n_iter, self.obj_func)
215
216
        # self._initialize_search(self._main_args_, nth_process, self._info_)
217
        n_positions = self.pro_arg.n_positions
218
        init_positions = self.init.set_start_pos(n_positions)
219
        self.opt = self.opt_class(init_positions, self.space.dim, opt_para={})
220
221
        print("init_positions", init_positions)
222
223
        # loop to initialize N positions
224
        for nth_init in range(len(init_positions)):
225
            pos_new = self.opt.init_pos(nth_init)
226
            score_new = self._get_score(pos_new, 0)
227
            self.opt.evaluate(score_new)
228
229
        # loop to do the iterations
230
        for nth_iter in range(len(init_positions), self.n_iter):
231
            pos_new = self.opt.iterate(nth_iter)
232
            score_new = self._get_score(pos_new, nth_iter)
233
            self.opt.evaluate(score_new)
234
235
        self.verb.p_bar.close_p_bar()
236
237
        return self.opt.p_list
238