Passed
Push — master ( 502a6e...660aa4 )
by Simon
04:50 queued 14s
created

SMBO.evaluate_init()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
from ..local_opt import HillClimbingOptimizer
7
from .sampling import InitialSampler
8
9
import warnings
10
import numpy as np
11
12
np.seterr(divide="ignore", invalid="ignore")
13
14
15
class SMBO(HillClimbingOptimizer):
16
    def __init__(
17
        self,
18
        *args,
19
        warm_start_smbo=None,
20
        max_sample_size=10000000,
21
        sampling={"random": 1000000},
22
        replacement=True,
23
        **kwargs
24
    ):
25
        super().__init__(*args, **kwargs)
26
27
        self.warm_start_smbo = warm_start_smbo
28
        self.max_sample_size = max_sample_size
29
        self.sampling = sampling
30
        self.replacement = replacement
31
32
        self.sampler = InitialSampler(self.conv, max_sample_size)
33
34
        self.init_warm_start_smbo(warm_start_smbo)
35
36
    def init_warm_start_smbo(self, search_data):
37
        if search_data is not None:
38
            # filter out nan and inf
39
            warm_start_smbo = search_data[
40
                ~search_data.isin([np.nan, np.inf, -np.inf]).any(axis=1)
41
            ]
42
43
            # filter out elements that are not in search space
44
            int_idx_list = []
45
            for para_name in self.conv.para_names:
46
                search_data_dim = warm_start_smbo[para_name].values
47
                search_space_dim = self.conv.search_space[para_name]
48
49
                int_idx = np.nonzero(np.in1d(search_data_dim, search_space_dim))[0]
50
                int_idx_list.append(int_idx)
51
52
            intersec = int_idx_list[0]
53
            for int_idx in int_idx_list[1:]:
54
                intersec = np.intersect1d(intersec, int_idx)
55
            warm_start_smbo_f = warm_start_smbo.iloc[intersec]
56
57
            X_sample_values = warm_start_smbo_f[self.conv.para_names].values
58
            Y_sample = warm_start_smbo_f["score"].values
59
60
            self.X_sample = self.conv.values2positions(X_sample_values)
61
            self.Y_sample = list(Y_sample)
62
63
        else:
64
            self.X_sample = []
65
            self.Y_sample = []
66
67
    def track_X_sample(iterate):
68
        def wrapper(self, *args, **kwargs):
69
            pos = iterate(self, *args, **kwargs)
70
            self.X_sample.append(pos)
71
            return pos
72
73
        return wrapper
74
75
    def track_y_sample(evaluate):
76
        def wrapper(self, score):
77
            evaluate(self, score)
78
79
            if np.isnan(score) or np.isinf(score):
80
                del self.X_sample[-1]
81
            else:
82
                self.Y_sample.append(score)
83
84
        return wrapper
85
86
    def _sampling(self, all_pos_comb):
87
        if self.sampling is False:
88
            return all_pos_comb
89
        elif "random" in self.sampling:
90
            return self.random_sampling(all_pos_comb)
91
92
    def random_sampling(self, pos_comb):
93
        n_samples = self.sampling["random"]
94
        n_pos_comb = pos_comb.shape[0]
95
96
        if n_pos_comb <= n_samples:
97
            return pos_comb
98
        else:
99
            _idx_sample = np.random.choice(n_pos_comb, n_samples, replace=False)
100
            pos_comb_sampled = pos_comb[_idx_sample, :]
101
            return pos_comb_sampled
102
103
    def _all_possible_pos(self):
104
        pos_space = self.sampler.get_pos_space()
105
        n_dim = len(pos_space)
106
        all_pos_comb = np.array(np.meshgrid(*pos_space)).T.reshape(-1, n_dim)
107
108
        all_pos_comb_constr = []
109
        for pos in all_pos_comb:
110
            if self.conv.not_in_constraint(pos):
111
                all_pos_comb_constr.append(pos)
112
113
        all_pos_comb_constr = np.array(all_pos_comb_constr)
114
        return all_pos_comb_constr
115
116
    def memory_warning(self, max_sample_size):
117
        if (
118
            self.conv.search_space_size > self.warnings
119
            and max_sample_size > self.warnings
120
        ):
121
            warning_message0 = "\n Warning:"
122
            warning_message1 = (
123
                "\n search space size of "
124
                + str(self.conv.search_space_size)
125
                + " exceeding recommended limit."
126
            )
127
            warning_message3 = "\n Reduce search space size for better performance."
128
            warnings.warn(warning_message0 + warning_message1 + warning_message3)
129
130
    @track_X_sample
131
    def init_pos(self):
132
        return super().init_pos()
133
134
    @HillClimbingOptimizer.track_new_pos
135
    @track_X_sample
136
    def iterate(self):
137
        pos_new = self._propose_location()
138
        if not self.replacement:
139
            self._remove_position(pos_new)
140
        return pos_new
141
142
    def _remove_position(self, position):
143
        mask = np.all(self.all_pos_comb == position, axis=1)
144
        self.all_pos_comb = self.all_pos_comb[np.invert(mask)]
145
146
    @HillClimbingOptimizer.track_new_score
147
    @track_y_sample
148
    def evaluate(self, score_new):
149
        self._evaluate_new2current(score_new)
150
        self._evaluate_current2best()
151
152
    @HillClimbingOptimizer.track_new_score
153
    @track_y_sample
154
    def evaluate_init(self, score_new):
155
        self._evaluate_new2current(score_new)
156
        self._evaluate_current2best()
157
158
    def _propose_location(self):
159
        try:
160
            self._training()
161
        except ValueError:
162
            print(
163
                "Warning: training sequential model failed. Performing random iteration instead."
164
            )
165
            return self.move_random()
166
167
        exp_imp = self._expected_improvement()
168
169
        index_best = list(exp_imp.argsort()[::-1])
170
        all_pos_comb_sorted = self.pos_comb[index_best]
171
        pos_best = all_pos_comb_sorted[0]
172
173
        return pos_best
174