gradient_free_optimizers.optimizers.smb_opt.smbo   A
last analyzed

Complexity

Total Complexity 28

Size/Duplication

Total Lines 191
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 28
eloc 137
dl 0
loc 191
rs 10
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A SMBO.iterate() 0 4 1
A SMBO._remove_position() 0 3 1
A SMBO._propose_location() 0 16 2
A SMBO.memory_warning() 0 16 3
A SMBO.init_pos() 0 3 1
A SMBO.random_sampling() 0 10 2
A SMBO._all_possible_pos() 0 12 3
A SMBO.track_y_sample() 0 10 3
A SMBO.evaluate() 0 8 2
A SMBO.evaluate_init() 0 5 1
A SMBO.init_warm_start_smbo() 0 32 4
A SMBO.__init__() 0 30 1
A SMBO.track_X_sample() 0 7 1
A SMBO._sampling() 0 5 3
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
from ..base_optimizer import BaseOptimizer
7
from .sampling import InitialSampler
8
9
import logging
10
import numpy as np
11
12
np.seterr(divide="ignore", invalid="ignore")
13
14
15
class SMBO(BaseOptimizer):
16
    def __init__(
17
        self,
18
        search_space,
19
        initialize={"grid": 4, "random": 2, "vertices": 4},
20
        constraints=[],
21
        random_state=None,
22
        rand_rest_p=0,
23
        nth_process=None,
24
        warm_start_smbo=None,
25
        max_sample_size=10000000,
26
        sampling={"random": 1000000},
27
        replacement=True,
28
    ):
29
        super().__init__(
30
            search_space=search_space,
31
            initialize=initialize,
32
            constraints=constraints,
33
            random_state=random_state,
34
            rand_rest_p=rand_rest_p,
35
            nth_process=nth_process,
36
        )
37
38
        self.warm_start_smbo = warm_start_smbo
39
        self.max_sample_size = max_sample_size
40
        self.sampling = sampling
41
        self.replacement = replacement
42
43
        self.sampler = InitialSampler(self.conv, max_sample_size)
44
45
        self.init_warm_start_smbo(warm_start_smbo)
46
47
    def init_warm_start_smbo(self, search_data):
48
        if search_data is not None:
49
            # filter out nan and inf
50
            warm_start_smbo = search_data[
51
                ~search_data.isin([np.nan, np.inf, -np.inf]).any(axis=1)
52
            ]
53
54
            # filter out elements that are not in search space
55
            int_idx_list = []
56
            for para_name in self.conv.para_names:
57
                search_data_dim = warm_start_smbo[para_name].values
58
                search_space_dim = self.conv.search_space[para_name]
59
60
                int_idx = np.nonzero(
61
                    np.isin(search_data_dim, search_space_dim)
62
                )[0]
63
                int_idx_list.append(int_idx)
64
65
            intersec = int_idx_list[0]
66
            for int_idx in int_idx_list[1:]:
67
                intersec = np.intersect1d(intersec, int_idx)
68
            warm_start_smbo_f = warm_start_smbo.iloc[intersec]
69
70
            X_sample_values = warm_start_smbo_f[self.conv.para_names].values
71
            Y_sample = warm_start_smbo_f["score"].values
72
73
            self.X_sample = self.conv.values2positions(X_sample_values)
74
            self.Y_sample = list(Y_sample)
75
76
        else:
77
            self.X_sample = []
78
            self.Y_sample = []
79
80
    def track_X_sample(iterate):
81
        def wrapper(self, *args, **kwargs):
82
            pos = iterate(self, *args, **kwargs)
83
            self.X_sample.append(pos)
84
            return pos
85
86
        return wrapper
87
88
    def track_y_sample(evaluate):
89
        def wrapper(self, score):
90
            evaluate(self, score)
91
92
            if np.isnan(score) or np.isinf(score):
93
                del self.X_sample[-1]
94
            else:
95
                self.Y_sample.append(score)
96
97
        return wrapper
98
99
    def _sampling(self, all_pos_comb):
100
        if self.sampling is False:
101
            return all_pos_comb
102
        elif "random" in self.sampling:
103
            return self.random_sampling(all_pos_comb)
104
105
    def random_sampling(self, pos_comb):
106
        n_samples = self.sampling["random"]
107
        n_pos_comb = pos_comb.shape[0]
108
109
        if n_pos_comb <= n_samples:
110
            return pos_comb
111
        else:
112
            _idx_sample = np.random.choice(n_pos_comb, n_samples, replace=False)
113
            pos_comb_sampled = pos_comb[_idx_sample, :]
114
            return pos_comb_sampled
115
116
    def _all_possible_pos(self):
117
        pos_space = self.sampler.get_pos_space()
118
        n_dim = len(pos_space)
119
        all_pos_comb = np.array(np.meshgrid(*pos_space)).T.reshape(-1, n_dim)
120
121
        all_pos_comb_constr = []
122
        for pos in all_pos_comb:
123
            if self.conv.not_in_constraint(pos):
124
                all_pos_comb_constr.append(pos)
125
126
        all_pos_comb_constr = np.array(all_pos_comb_constr)
127
        return all_pos_comb_constr
128
129
    def memory_warning(self, max_sample_size):
130
        if (
131
            self.conv.search_space_size > self.warnings
132
            and max_sample_size > self.warnings
133
        ):
134
            warning_message0 = "\n Warning:"
135
            warning_message1 = (
136
                "\n search space size of "
137
                + str(self.conv.search_space_size)
138
                + " exceeding recommended limit."
139
            )
140
            warning_message3 = (
141
                "\n Reduce search space size for better performance."
142
            )
143
            logging.warning(
144
                warning_message0 + warning_message1 + warning_message3
145
            )
146
147
    @track_X_sample
148
    def init_pos(self):
149
        return super().init_pos()
150
151
    @BaseOptimizer.track_new_pos
152
    @track_X_sample
153
    def iterate(self):
154
        return self._propose_location()
155
156
    def _remove_position(self, position):
157
        mask = np.all(self.all_pos_comb == position, axis=1)
158
        self.all_pos_comb = self.all_pos_comb[np.invert(mask)]
159
160
    @BaseOptimizer.track_new_score
161
    @track_y_sample
162
    def evaluate(self, score_new):
163
        self._evaluate_new2current(score_new)
164
        self._evaluate_current2best()
165
166
        if not self.replacement:
167
            self._remove_position(self.pos_new)
168
169
    @BaseOptimizer.track_new_score
170
    @track_y_sample
171
    def evaluate_init(self, score_new):
172
        self._evaluate_new2current(score_new)
173
        self._evaluate_current2best()
174
175
    def _propose_location(self):
176
        try:
177
            self._training()
178
        except ValueError:
179
            logging.warning(
180
                "Warning: training sequential model failed. Performing random iteration instead."
181
            )
182
            return self.move_random()
183
184
        exp_imp = self._expected_improvement()
185
186
        index_best = list(exp_imp.argsort()[::-1])
187
        all_pos_comb_sorted = self.pos_comb[index_best]
188
        pos_best = all_pos_comb_sorted[0]
189
190
        return pos_best
191