Completed
Branch develop (641d1e)
by Fabian
01:02
created

CircuitBreaker.opened()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
from functools import wraps
2
from datetime import datetime, timedelta
3
4
STATE_CLOSED = 'closed'
5
STATE_OPEN = 'open'
6
STATE_HALF_OPEN = 'half_open'
7
8
9
class CircuitBreaker(object):
10
    def __init__(self, expected_exception=Exception, failure_threshold=5,
11
                 recover_timeout=30, name=None):
12
        self._expected_exception = expected_exception
13
        self._failure_count = 0
14
        self._failure_threshold = failure_threshold
15
        self._recover_timeout = recover_timeout
16
        self._state = STATE_CLOSED
17
        self._opened = datetime.utcnow()
18
        self._name = name
19
20
    def __call__(self, wrapped):
21
        """
22
        Applies the circuit breaker decorator to a function
23
        """
24
        if self._name is None:
25
            self._name = wrapped.__name__
26
27
        CircuitBreakerMonitor.register(self)
28
29
        @wraps(wrapped)
30
        def wrapper(*args, **kwargs):
31
            return self.call(wrapped, *args, **kwargs)
32
33
        return wrapper
34
35
    def call(self, func, *args, **kwargs):
36
        """
37
        Calls the decorated function and applies the circuit breaker
38
        rules on success or failure
39
        :param func: Decorated function
40
        """
41
        if not self.closed:
42
            raise CircuitBreakerError(self)
43
        try:
44
            result = func(*args, **kwargs)
45
        except self._expected_exception:
46
            self.__failure()
47
            raise
48
49
        self.__success()
50
        return result
51
52
    def __success(self):
53
        """
54
        Close circuit after successful execution
55
        """
56
        self.close()
57
58
    def __failure(self):
59
        """
60
        Count failure and open circuit, if threshold has been reached
61
        """
62
        self._failure_count += 1
63
        if self._failure_count >= self._failure_threshold:
64
            self.open()
65
66
    def open(self):
67
        """
68
        Open the circuit breaker
69
        """
70
        self._state = STATE_OPEN
71
        self._opened = datetime.utcnow()
72
73
    def close(self):
74
        """
75
        Close the circuit breaker
76
        """
77
        self._state = STATE_CLOSED
78
        self._failure_count = 0
79
80
    @property
81
    def open_until(self):
82
        """
83
        The datetime, when the circuit breaker will try to recover
84
        :return: datetime
85
        """
86
        return self._opened + timedelta(seconds=self._recover_timeout)
87
88
    @property
89
    def open_remaining(self):
90
        """
91
        Number of seconds remaining, the circuit breaker stays in OPEN state
92
        :return: int
93
        """
94
        return (self.open_until - datetime.utcnow()).total_seconds()
95
96
    @property
97
    def failure_count(self):
98
        return self._failure_count
99
100
    @property
101
    def closed(self):
102
        return self.state is not STATE_OPEN
103
104
    @property
105
    def opened(self):
106
        return not self.closed
107
108
    @property
109
    def name(self):
110
        return self._name
111
112
    @property
113
    def state(self):
114
        if self._state == STATE_OPEN and self.open_remaining <= 0:
115
            return STATE_HALF_OPEN
116
        return self._state
117
118
    def __str__(self, *args, **kwargs):
119
        return self._name
120
121
122
class CircuitBreakerError(Exception):
123
    def __init__(self, circuit_breaker, *args, **kwargs):
124
        """
125
        :param circuit_breaker:
126
        :param args:
127
        :param kwargs:
128
        :return:
129
        """
130
        super().__init__(*args, **kwargs)
131
        self._circuit_breaker = circuit_breaker
132
133
    def __str__(self, *args, **kwargs):
134
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining)' % (
135
            self._circuit_breaker.name,
136
            self._circuit_breaker.open_until,
137
            self._circuit_breaker.failure_count,
138
            round(self._circuit_breaker.open_remaining)
139
        )
140
141
142
class CircuitBreakerMonitor(object):
143
    circuit_breakers = {}
144
145
    @classmethod
146
    def register(cls, circuit_breaker):
147
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
148
149
    @classmethod
150
    def all_closed(cls) -> bool:
151
        return len(list(cls.get_open())) == 0
152
153
    @classmethod
154
    def get_circuits(cls) -> [CircuitBreaker]:
155
        return cls.circuit_breakers.values()
156
157
    @classmethod
158
    def get(cls, name) -> CircuitBreaker:
159
        return cls.circuit_breakers.get(name)
160
161
    @classmethod
162
    def get_open(cls) -> [CircuitBreaker]:
163
        for circuit in cls.get_circuits():
164
            if circuit.opened:
165
                yield circuit
166
167
    @classmethod
168
    def get_closed(cls) -> [CircuitBreaker]:
169
        for circuit in cls.get_circuits():
170
            if circuit.closed:
171
                yield circuit
172