Passed
Push — master ( d39371...69bf6f )
by Simon
03:38
created

gradient_free_optimizers.optimizers.population.parallel_tempering   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 82
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 53
dl 0
loc 82
rs 10
c 0
b 0
f 0
wmc 13

7 Methods

Rating   Name   Duplication   Size   Complexity  
A ParallelTemperingOptimizer.iterate() 0 6 1
A ParallelTemperingOptimizer._accept_swap() 0 12 3
A ParallelTemperingOptimizer.__init__() 0 6 1
A ParallelTemperingOptimizer.init_pos() 0 6 1
A ParallelTemperingOptimizer.evaluate() 0 13 3
A System.__init__() 0 3 1
A ParallelTemperingOptimizer._swap_pos() 0 12 3
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
import random
7
8
import numpy as np
9
10
from .base_population_optimizer import BasePopulationOptimizer
11
from ...search import Search
12
from ..local import SimulatedAnnealingOptimizer
13
14
15
class ParallelTemperingOptimizer(BasePopulationOptimizer, Search):
16
    def __init__(self, search_space, n_iter_swap=10):
17
        super().__init__(search_space)
18
19
        self.n_iter_swap = n_iter_swap
20
21
        self.systems = []
22
23
    def _swap_pos(self):
24
        _systems_temp = self.systems[:]
25
26
        for _p1_ in self.systems:
27
            rand = random.uniform(0, 1)
28
            _p2_ = np.random.choice(_systems_temp)
29
30
            p_accept = self._accept_swap(_p1_, _p2_)
31
            if p_accept > rand:
32
                temp_temp = _p1_.temp  # haha!
33
                _p1_.temp = _p2_.temp
34
                _p2_.temp = temp_temp
35
36
    def _accept_swap(self, _p1_, _p2_):
37
        denom = _p1_.score_current + _p2_.score_current
38
39
        if denom == 0:
40
            return 100
41
        elif abs(denom) == np.inf:
42
            return 0
43
        else:
44
            score_diff_norm = (_p1_.score_current - _p2_.score_current) / denom
45
46
            temp = (1 / _p1_.temp) - (1 / _p2_.temp)
47
            return np.exp(score_diff_norm * temp)
48
49
    def init_pos(self, pos):
50
        system = SimulatedAnnealingOptimizer(self.space_dim)
51
        self.systems.append(system)
52
        system.init_pos(pos)
53
54
        self.p_current = system
55
56
    def iterate(self):
57
        nth_iter = self._iterations(self.systems)
58
        print("nth_iter", nth_iter)
59
        self.p_current = self.systems[nth_iter % len(self.systems)]
60
61
        return self.p_current.iterate()
62
63
    def evaluate(self, score_new):
64
        nth_iter = self._iterations(self.systems)
65
66
        notZero = self.n_iter_swap != 0
67
        modZero = nth_iter % self.n_iter_swap == 0
68
69
        if notZero and modZero:
70
            self._swap_pos()
71
72
        self.p_current.score_new = score_new
73
74
        self.p_current._evaluate_new2current(score_new)
75
        self.p_current._evaluate_current2best()
76
77
78
class System:
79
    def __init__(self, space_dim, _opt_args_, temp):
80
        super().__init__(space_dim, _opt_args_)
81
        self.temp = temp
82