Completed
Push — master ( ce1e03...f67568 )
by Simon
14:21
created

ParallelTemperingOptimizer._anneal_system()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
import random
7
8
import numpy as np
9
10
from .simulated_annealing import SimulatedAnnealingOptimizer
11
from ..local import HillClimbingPositioner
12
13
14
class ParallelTemperingOptimizer(SimulatedAnnealingOptimizer):
15
    def __init__(self, _opt_args_):
16
        super().__init__(_opt_args_)
17
        self.n_iter_swap = _opt_args_.n_iter_swap
18
        self.n_positioners = len(_opt_args_.system_temperatures)
19
20
    def _init_annealer(self, _cand_):
21
        temp = self._opt_args_.system_temperatures[self.i]
22
        _p_ = System(**self._opt_args_.kwargs_opt, temp=temp)
23
24
        _p_.pos_new = _cand_._space_.get_random_pos()
25
26
        return _p_
27
28
    def _swap_pos(self, _cand_):
29
        _p_list_temp = self.p_list[:]
30
31
        for _p1_ in self.p_list:
32
            rand = random.uniform(0, 1)
33
            _p2_ = np.random.choice(_p_list_temp)
34
35
            p_accept = self._accept_swap(_p1_, _p2_)
36
            if p_accept > rand:
37
                temp_temp = _p1_.temp  # haha!
38
                _p1_.temp = _p2_.temp
39
                _p2_.temp = temp_temp
40
41
    def _accept_swap(self, _p1_, _p2_):
42
        denom = _p1_.score_current + _p2_.score_current
43
44
        if denom == 0:
45
            return 100
46
        elif abs(denom) == np.inf:
47
            return 0
48
        else:
49
            score_diff_norm = (_p1_.score_current - _p2_.score_current) / denom
50
51
            temp = (1 / _p1_.temp) - (1 / _p2_.temp)
52
            return np.exp(score_diff_norm * temp)
53
54
    def _anneal_system(self, _cand_, _p_):
55
        self._p_ = _p_
56
        super()._iterate(0, _cand_)
57
58
    def _iterate(self, i, _cand_):
59
        _p_current = self.p_list[i % len(self.p_list)]
60
61
        self._anneal_system(_cand_, _p_current)
62
63
        if self.n_iter_swap != 0 and i % self.n_iter_swap == 0:
64
            self._swap_pos(_cand_)
65
66
        return _cand_
67
68
    def _init_iteration(self, _cand_):
69
        p = self._init_annealer(_cand_)
70
71
        self._optimizer_eval(_cand_, p)
72
        self._update_pos(_cand_, p)
73
74
        return p
75
76
    def _finish_search(self):
77
        self._pbar_.close_p_bar()
78
79
80
class System(HillClimbingPositioner):
81
    def __init__(self, *args, **kwargs):
82
        super().__init__(*args, **kwargs)
83
        self.temp = kwargs["temp"]
84