Passed
Push — master ( 626f23...be3b1e )
by Simon
01:48
created

ParallelTemperingOptimizer.__init__()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 13
nop 5
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
import copy
6
import random
7
8
import numpy as np
9
10
from .base_population_optimizer import BasePopulationOptimizer
11
from ...search import Search
12
from ..local_opt import SimulatedAnnealingOptimizer
13
14
15
class ParallelTemperingOptimizer(BasePopulationOptimizer, Search):
16
    name = "Parallel Tempering"
17
18
    def __init__(
19
        self,
20
        *args,
21
        population=5,
22
        n_iter_swap=5,
23
        **kwargs,
24
    ):
25
        super().__init__(*args, **kwargs)
26
27
        self.population = population
28
        self.n_iter_swap = n_iter_swap
29
30
        self.systems = self._create_population(SimulatedAnnealingOptimizer)
31
        for system in self.systems:
32
            system.temp = 1.1 ** random.uniform(0, 25)
33
        self.optimizers = self.systems
34
35 View Code Duplication
    def _swap_pos(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
36
        for _p1_ in self.systems:
37
            _systems_temp = copy.copy(self.systems)
38
            if len(_systems_temp) > 1:
39
                _systems_temp.remove(_p1_)
40
41
            rand = random.uniform(0, 1) * 100
42
            _p2_ = np.random.choice(_systems_temp)
43
44
            p_accept = self._accept_swap(_p1_, _p2_)
45
            if p_accept > rand:
46
                _p1_.temp, _p2_.temp = (_p2_.temp, _p1_.temp)
47
48
    def _accept_swap(self, _p1_, _p2_):
49
        denom = _p1_.score_current + _p2_.score_current
50
51
        if denom == 0:
52
            return 100
53
        elif abs(denom) == np.inf:
54
            return 0
55
        else:
56
            score_diff_norm = (_p1_.score_current - _p2_.score_current) / denom
57
58
            temp = (1 / _p1_.temp) - (1 / _p2_.temp)
59
            return np.exp(score_diff_norm * temp) * 100
60
61
    def init_pos(self, pos):
62
        nth_pop = self.nth_iter % len(self.systems)
63
64
        self.p_current = self.systems[nth_pop]
65
        self.p_current.init_pos(pos)
66
67
    def iterate(self):
68
        nth_iter = self._iterations(self.systems)
69
        self.p_current = self.systems[nth_iter % len(self.systems)]
70
71
        return self.p_current.iterate()
72
73
    def evaluate(self, score_new):
74
        nth_iter = self._iterations(self.systems)
75
76
        notZero = self.n_iter_swap != 0
77
        modZero = nth_iter % self.n_iter_swap == 0
78
79
        if notZero and modZero:
80
            self._swap_pos()
81
82
        self.p_current.evaluate(score_new)
83