1 | # -*- coding: utf-8 -*- |
||
2 | 1 | from __future__ import unicode_literals |
|
3 | 1 | from __future__ import division |
|
4 | 1 | from __future__ import print_function |
|
5 | 1 | from __future__ import absolute_import |
|
6 | |||
7 | 1 | from functools import wraps |
|
8 | 1 | from datetime import datetime, timedelta |
|
9 | 1 | from inspect import isgeneratorfunction |
|
10 | 1 | from typing import AnyStr, Iterable |
|
11 | 1 | from math import ceil, floor |
|
12 | |||
13 | 1 | try: |
|
14 | 1 | from time import monotonic |
|
15 | 1 | except ImportError: |
|
16 | 1 | from monotonic import monotonic |
|
17 | |||
18 | 1 | STATE_CLOSED = 'closed' |
|
19 | 1 | STATE_OPEN = 'open' |
|
20 | 1 | STATE_HALF_OPEN = 'half_open' |
|
21 | |||
22 | |||
23 | 1 | class CircuitBreaker(object): |
|
24 | 1 | FAILURE_THRESHOLD = 5 |
|
25 | 1 | RECOVERY_TIMEOUT = 30 |
|
26 | 1 | EXPECTED_EXCEPTION = Exception |
|
27 | 1 | FALLBACK_FUNCTION = None |
|
28 | |||
29 | 1 | def __init__(self, |
|
30 | failure_threshold=None, |
||
31 | recovery_timeout=None, |
||
32 | expected_exception=None, |
||
33 | name=None, |
||
34 | fallback_function=None): |
||
35 | 1 | self._last_failure = None |
|
36 | 1 | self._failure_count = 0 |
|
37 | 1 | self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD |
|
38 | 1 | self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT |
|
39 | 1 | self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION |
|
40 | 1 | self._fallback_function = fallback_function or self.FALLBACK_FUNCTION |
|
41 | 1 | self._name = name |
|
42 | 1 | self._state = STATE_CLOSED |
|
43 | 1 | self._opened = monotonic() |
|
44 | |||
45 | 1 | def __call__(self, wrapped): |
|
46 | 1 | return self.decorate(wrapped) |
|
47 | |||
48 | 1 | def __enter__(self): |
|
49 | 1 | return None |
|
50 | |||
51 | 1 | def __exit__(self, exc_type, exc_value, _traceback): |
|
52 | 1 | if exc_type and issubclass(exc_type, self._expected_exception): |
|
53 | # exception was raised and is our concern |
||
54 | 1 | self._last_failure = exc_value |
|
55 | 1 | self.__call_failed() |
|
56 | else: |
||
57 | 1 | self.__call_succeeded() |
|
58 | 1 | return False # return False to raise exception if any |
|
59 | |||
60 | 1 | def decorate(self, function): |
|
61 | """ |
||
62 | Applies the circuit breaker to a function |
||
63 | """ |
||
64 | 1 | if self._name is None: |
|
65 | 1 | self._name = function.__name__ |
|
66 | |||
67 | 1 | CircuitBreakerMonitor.register(self) |
|
68 | |||
69 | 1 | if isgeneratorfunction(function): |
|
70 | 1 | call = self.call_generator |
|
71 | else: |
||
72 | 1 | call = self.call |
|
73 | |||
74 | 1 | @wraps(function) |
|
75 | def wrapper(*args, **kwargs): |
||
76 | 1 | if self.opened: |
|
77 | 1 | if self.fallback_function: |
|
78 | 1 | return self.fallback_function(*args, **kwargs) |
|
79 | 1 | raise CircuitBreakerError(self) |
|
80 | 1 | return call(function, *args, **kwargs) |
|
81 | |||
82 | 1 | return wrapper |
|
83 | |||
84 | 1 | def call(self, func, *args, **kwargs): |
|
85 | """ |
||
86 | Calls the decorated function and applies the circuit breaker |
||
87 | rules on success or failure |
||
88 | :param func: Decorated function |
||
89 | """ |
||
90 | 1 | with self: |
|
91 | 1 | return func(*args, **kwargs) |
|
92 | |||
93 | 1 | def call_generator(self, func, *args, **kwargs): |
|
94 | """ |
||
95 | Calls the decorated generator function and applies the circuit breaker |
||
96 | rules on success or failure |
||
97 | :param func: Decorated generator function |
||
98 | """ |
||
99 | 1 | with self: |
|
100 | 1 | for el in func(*args, **kwargs): |
|
101 | 1 | yield el |
|
102 | |||
103 | 1 | def __call_succeeded(self): |
|
104 | """ |
||
105 | Close circuit after successful execution and reset failure count |
||
106 | """ |
||
107 | 1 | self._state = STATE_CLOSED |
|
108 | 1 | self._last_failure = None |
|
109 | 1 | self._failure_count = 0 |
|
110 | |||
111 | 1 | def __call_failed(self): |
|
112 | """ |
||
113 | Count failure and open circuit, if threshold has been reached |
||
114 | """ |
||
115 | 1 | self._failure_count += 1 |
|
116 | 1 | if self._failure_count >= self._failure_threshold: |
|
117 | 1 | self._state = STATE_OPEN |
|
118 | 1 | self._opened = monotonic() |
|
119 | |||
120 | 1 | @property |
|
121 | def state(self): |
||
122 | 1 | if self._state == STATE_OPEN and self.open_remaining <= 0: |
|
123 | 1 | return STATE_HALF_OPEN |
|
124 | 1 | return self._state |
|
125 | |||
126 | 1 | @property |
|
127 | def open_until(self): |
||
128 | """ |
||
129 | The approximate datetime when the circuit breaker will try to recover |
||
130 | :return: datetime |
||
131 | """ |
||
132 | 1 | return datetime.utcnow() + timedelta(seconds=self.open_remaining) |
|
133 | |||
134 | 1 | @property |
|
135 | def open_remaining(self): |
||
136 | """ |
||
137 | Number of seconds remaining, the circuit breaker stays in OPEN state |
||
138 | :return: int |
||
139 | """ |
||
140 | 1 | remain = (self._opened + self._recovery_timeout) - monotonic() |
|
141 | 1 | return ceil(remain) if remain > 0 else floor(remain) |
|
142 | |||
143 | 1 | @property |
|
144 | def failure_count(self): |
||
145 | 1 | return self._failure_count |
|
146 | |||
147 | 1 | @property |
|
148 | def closed(self): |
||
149 | 1 | return self.state == STATE_CLOSED |
|
150 | |||
151 | 1 | @property |
|
152 | def opened(self): |
||
153 | 1 | return self.state == STATE_OPEN |
|
154 | |||
155 | 1 | @property |
|
156 | def name(self): |
||
157 | 1 | return self._name |
|
158 | |||
159 | 1 | @property |
|
160 | def last_failure(self): |
||
161 | 1 | return self._last_failure |
|
162 | |||
163 | 1 | @property |
|
164 | def fallback_function(self): |
||
165 | 1 | return self._fallback_function |
|
166 | |||
167 | 1 | def __str__(self, *args, **kwargs): |
|
168 | 1 | return self._name |
|
169 | |||
170 | |||
171 | 1 | class CircuitBreakerError(Exception): |
|
172 | 1 | def __init__(self, circuit_breaker, *args, **kwargs): |
|
173 | """ |
||
174 | :param circuit_breaker: |
||
175 | :param args: |
||
176 | :param kwargs: |
||
177 | :return: |
||
178 | """ |
||
179 | 1 | super(CircuitBreakerError, self).__init__(*args, **kwargs) |
|
180 | 1 | self._circuit_breaker = circuit_breaker |
|
181 | |||
182 | 1 | def __str__(self, *args, **kwargs): |
|
183 | 1 | return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining) (last_failure: %r)' % ( |
|
184 | self._circuit_breaker.name, |
||
185 | self._circuit_breaker.open_until, |
||
186 | self._circuit_breaker.failure_count, |
||
187 | round(self._circuit_breaker.open_remaining), |
||
188 | self._circuit_breaker.last_failure, |
||
189 | ) |
||
190 | |||
191 | |||
192 | 1 | class CircuitBreakerMonitor(object): |
|
193 | 1 | circuit_breakers = {} |
|
194 | |||
195 | 1 | @classmethod |
|
196 | def register(cls, circuit_breaker): |
||
197 | 1 | cls.circuit_breakers[circuit_breaker.name] = circuit_breaker |
|
198 | |||
199 | 1 | @classmethod |
|
200 | def all_closed(cls): |
||
201 | # type: () -> bool |
||
202 | 1 | return len(list(cls.get_open())) == 0 |
|
203 | |||
204 | 1 | @classmethod |
|
205 | def get_circuits(cls): |
||
206 | # type: () -> Iterable[CircuitBreaker] |
||
207 | 1 | return cls.circuit_breakers.values() |
|
208 | |||
209 | 1 | @classmethod |
|
210 | def get(cls, name): |
||
211 | # type: (AnyStr) -> CircuitBreaker |
||
212 | 1 | return cls.circuit_breakers.get(name) |
|
213 | |||
214 | 1 | @classmethod |
|
215 | def get_open(cls): |
||
216 | # type: () -> Iterable[CircuitBreaker] |
||
217 | 1 | for circuit in cls.get_circuits(): |
|
218 | 1 | if circuit.opened: |
|
219 | 1 | yield circuit |
|
220 | |||
221 | 1 | @classmethod |
|
222 | def get_closed(cls): |
||
223 | # type: () -> Iterable[CircuitBreaker] |
||
224 | 1 | for circuit in cls.get_circuits(): |
|
225 | 1 | if circuit.closed: |
|
226 | 1 | yield circuit |
|
227 | |||
228 | |||
229 | 1 | def circuit(failure_threshold=None, |
|
230 | recovery_timeout=None, |
||
231 | expected_exception=None, |
||
232 | name=None, |
||
233 | fallback_function=None, |
||
234 | cls=CircuitBreaker): |
||
235 | |||
236 | # if the decorator is used without parameters, the |
||
237 | # wrapped function is provided as first argument |
||
238 | 1 | if callable(failure_threshold): |
|
239 | 1 | return cls().decorate(failure_threshold) |
|
240 | else: |
||
241 | 1 | return cls( |
|
242 | failure_threshold=failure_threshold, |
||
243 | recovery_timeout=recovery_timeout, |
||
244 | expected_exception=expected_exception, |
||
245 | name=name, |
||
246 | fallback_function=fallback_function) |
||
247 |