ParallelTemperingOptimizer.evaluate()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
import copy
6
import random
7
8
import numpy as np
9
10
from .base_population_optimizer import BasePopulationOptimizer
11
from ..local_opt import SimulatedAnnealingOptimizer
12
13
14
class ParallelTemperingOptimizer(BasePopulationOptimizer):
15
    name = "Parallel Tempering"
16
    _name_ = "parallel_tempering"
17
    __name__ = "ParallelTemperingOptimizer"
18
19
    optimizer_type = "population"
20
    computationally_expensive = False
21
22
    def __init__(
23
        self,
24
        search_space,
25
        initialize={"grid": 4, "random": 2, "vertices": 4},
26
        constraints=[],
27
        random_state=None,
28
        rand_rest_p=0,
29
        nth_process=None,
30
        population=5,
31
        n_iter_swap=5,
32
    ):
33
        super().__init__(
34
            search_space=search_space,
35
            initialize=initialize,
36
            constraints=constraints,
37
            random_state=random_state,
38
            rand_rest_p=rand_rest_p,
39
            nth_process=nth_process,
40
        )
41
42
        self.population = population
43
        self.n_iter_swap = n_iter_swap
44
45
        self.systems = self._create_population(SimulatedAnnealingOptimizer)
46
        for system in self.systems:
47
            system.temp = 1.1 ** random.uniform(0, 25)
48
        self.optimizers = self.systems
49
50
    def _swap_pos(self):
51
        for _p1_ in self.systems:
52
            _systems_temp = copy.copy(self.systems)
53
            if len(_systems_temp) > 1:
54
                _systems_temp.remove(_p1_)
55
56
            rand = random.uniform(0, 1) * 100
57
            _p2_ = np.random.choice(_systems_temp)
58
59
            p_accept = self._accept_swap(_p1_, _p2_)
60
            if p_accept > rand:
61
                _p1_.temp, _p2_.temp = (_p2_.temp, _p1_.temp)
62
63
    def _accept_swap(self, _p1_, _p2_):
64
        denom = _p1_.score_current + _p2_.score_current
65
66
        if denom == 0:
67
            return 100
68
        elif abs(denom) == np.inf:
69
            return 0
70
        else:
71
            score_diff_norm = (_p1_.score_current - _p2_.score_current) / denom
72
73
            temp = (1 / _p1_.temp) - (1 / _p2_.temp)
74
            return np.exp(score_diff_norm * temp) * 100
75
76
    @BasePopulationOptimizer.track_new_pos
77
    def init_pos(self):
78
        nth_pop = self.nth_trial % len(self.systems)
79
        self.p_current = self.systems[nth_pop]
80
81
        return self.p_current.init_pos()
82
83
    @BasePopulationOptimizer.track_new_pos
84
    def iterate(self):
85
        self.p_current = self.systems[self.nth_trial % len(self.systems)]
86
        return self.p_current.iterate()
87
88
    @BasePopulationOptimizer.track_new_score
89
    def evaluate(self, score_new):
90
        notZero = self.n_iter_swap != 0
91
        modZero = self.nth_trial % self.n_iter_swap == 0
92
93
        if notZero and modZero:
94
            self._swap_pos()
95
96
        self.p_current.evaluate(score_new)
97