split()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 2
dl 0
loc 14
rs 9.85
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
import math
6
import numpy as np
7
8
from ..core_optimizer import CoreOptimizer
9
10
11
def split(positions_l, population):
12
    div_int = math.ceil(len(positions_l) / population)
13
    dist_init_positions = []
14
15
    for nth_indiv in range(population):
16
        indiv_pos = []
17
        for nth_indiv_pos in range(div_int):
18
            idx = nth_indiv + nth_indiv_pos * population
19
            if idx < len(positions_l):
20
                indiv_pos.append(positions_l[idx])
21
22
        dist_init_positions.append(indiv_pos)
23
24
    return dist_init_positions
25
26
27
class BasePopulationOptimizer(CoreOptimizer):
28
    def __init__(
29
        self,
30
        search_space,
31
        initialize={"grid": 4, "random": 2, "vertices": 4},
32
        constraints=[],
33
        random_state=None,
34
        rand_rest_p=0,
35
        nth_process=None,
36
    ):
37
        super().__init__(
38
            search_space=search_space,
39
            initialize=initialize,
40
            constraints=constraints,
41
            random_state=random_state,
42
            rand_rest_p=rand_rest_p,
43
            nth_process=nth_process,
44
        )
45
46
        self.eval_times = []
47
        self.iter_times = []
48
49
        self.init_done = False
50
51
    def _iterations(self, positioners):
52
        nth_iter = 0
53
        for p in positioners:
54
            nth_iter = nth_iter + len(p.pos_new_list)
55
56
        return nth_iter
57
58
    def sort_pop_best_score(self):
59
        scores_list = []
60
        for _p_ in self.optimizers:
61
            scores_list.append(_p_.score_current)
62
63
        scores_np = np.array(scores_list)
64
        idx_sorted_ind = list(scores_np.argsort()[::-1])
65
66
        self.pop_sorted = [self.optimizers[i] for i in idx_sorted_ind]
67
68
    def _create_population(self, Optimizer):
69
        if isinstance(self.population, int):
70
            pop_size = self.population
71
        else:
72
            pop_size = len(self.population)
73
        diff_init = pop_size - self.init.n_inits
74
75
        if diff_init > 0:
76
            self.init.add_n_random_init_pos(diff_init)
77
78
        if isinstance(self.population, int):
79
            distributed_init_positions = split(
80
                self.init.init_positions_l, self.population
81
            )
82
83
            population = []
84
            for init_positions in distributed_init_positions:
85
                init_values = self.conv.positions2values(init_positions)
86
                init_paras = self.conv.values2paras(init_values)
87
88
                population.append(
89
                    Optimizer(
90
                        self.conv.search_space,
91
                        rand_rest_p=self.rand_rest_p,
92
                        initialize={"warm_start": init_paras},
93
                        constraints=self.constraints,
94
                    )
95
                )
96
        else:
97
            population = self.population
98
99
        return population
100
101
    @CoreOptimizer.track_new_score
102
    def evaluate_init(self, score_new):
103
        self.p_current.evaluate_init(score_new)
104
105
    def finish_initialization(self):
106
        self.search_state = "iter"
107