Passed
Pull Request — develop (#33)
by
unknown
01:24
created

circuitbreaker.CircuitBreaker.decorate()   B

Complexity

Conditions 6

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 6.0585

Importance

Changes 0
Metric Value
cc 6
eloc 18
nop 2
dl 0
loc 26
ccs 15
cts 17
cp 0.8824
crap 6.0585
rs 8.5666
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 1
from __future__ import unicode_literals
3 1
from __future__ import division
4 1
from __future__ import print_function
5 1
from __future__ import absolute_import
6
7 1
from functools import wraps
8 1
from datetime import datetime, timedelta
9 1
from inspect import isgeneratorfunction
10 1
from typing import AnyStr, Iterable
11 1
from math import ceil, floor
12
13 1
try:
14 1
  from time import monotonic
15
except ImportError:
16
  from monotonic import monotonic
17
18 1
STATE_CLOSED = 'closed'
19 1
STATE_OPEN = 'open'
20 1
STATE_HALF_OPEN = 'half_open'
21
22
23 1
class CircuitBreaker(object):
24 1
    FAILURE_THRESHOLD = 5
25 1
    RECOVERY_TIMEOUT = 30
26 1
    EXPECTED_EXCEPTION = Exception
27 1
    FALLBACK_FUNCTION = None
28
29 1
    def __init__(self,
30
                 failure_threshold=None,
31
                 recovery_timeout=None,
32
                 expected_exception=None,
33
                 name=None,
34
                 fallback_function=None):
35 1
        self._last_failure = None
36 1
        self._failure_count = 0
37 1
        self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
38 1
        self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
39 1
        self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION
40 1
        self._fallback_function = fallback_function or self.FALLBACK_FUNCTION
41 1
        self._name = name
42 1
        self._state = STATE_CLOSED
43 1
        self._opened = monotonic()
44
45 1
    def __call__(self, wrapped):
46 1
        return self.decorate(wrapped)
47
48 1
    def __enter__(self):
49 1
        return None
50
51 1
    def __exit__(self, exc_type, exc_value, _traceback):
52 1
        if exc_type and issubclass(exc_type, self._expected_exception):
53
            # exception was raised and is our concern
54 1
            self._last_failure = exc_value
55 1
            self.__call_failed()
56
        else:
57 1
            self.__call_succeeded()
58 1
        return False  # return False to raise exception if any
59
60 1
    def decorate(self, function):
61
        """
62
        Applies the circuit breaker to a function
63
        """
64 1
        if self._name is None:
65 1
            try:
66 1
                self._name = function.__qualname__
67
            except AttributeError:
68
                self._name = function.__name__
69
70 1
        CircuitBreakerMonitor.register(self)
71
72 1
        if isgeneratorfunction(function):
73 1
            call = self.call_generator
74
        else:
75 1
            call = self.call
76
77 1
        @wraps(function)
78
        def wrapper(*args, **kwargs):
79 1
            if self.opened:
80 1
                if self.fallback_function:
81 1
                    return self.fallback_function(*args, **kwargs)
82 1
                raise CircuitBreakerError(self)
83 1
            return call(function, *args, **kwargs)
84
85 1
        return wrapper
86
87 1
    def call(self, func, *args, **kwargs):
88
        """
89
        Calls the decorated function and applies the circuit breaker
90
        rules on success or failure
91
        :param func: Decorated function
92
        """
93 1
        with self:
94 1
            return func(*args, **kwargs)
95
96 1
    def call_generator(self, func, *args, **kwargs):
97
        """
98
        Calls the decorated generator function and applies the circuit breaker
99
        rules on success or failure
100
        :param func: Decorated generator function
101
        """
102 1
        with self:
103 1
            for el in func(*args, **kwargs):
104 1
                yield el
105
106 1
    def __call_succeeded(self):
107
        """
108
        Close circuit after successful execution and reset failure count
109
        """
110 1
        self._state = STATE_CLOSED
111 1
        self._last_failure = None
112 1
        self._failure_count = 0
113
114 1
    def __call_failed(self):
115
        """
116
        Count failure and open circuit, if threshold has been reached
117
        """
118 1
        self._failure_count += 1
119 1
        if self._failure_count >= self._failure_threshold:
120 1
            self._state = STATE_OPEN
121 1
            self._opened = monotonic()
122
123 1
    @property
124
    def state(self):
125 1
        if self._state == STATE_OPEN and self.open_remaining <= 0:
126 1
            return STATE_HALF_OPEN
127 1
        return self._state
128
129 1
    @property
130
    def open_until(self):
131
        """
132
        The approximate datetime when the circuit breaker will try to recover
133
        :return: datetime
134
        """
135 1
        return datetime.utcnow() + timedelta(seconds=self.open_remaining)
136
137 1
    @property
138
    def open_remaining(self):
139
        """
140
        Number of seconds remaining, the circuit breaker stays in OPEN state
141
        :return: int
142
        """
143 1
        remain = (self._opened + self._recovery_timeout) - monotonic()
144 1
        return ceil(remain) if remain > 0 else floor(remain)
145
146 1
    @property
147
    def failure_count(self):
148 1
        return self._failure_count
149
150 1
    @property
151
    def closed(self):
152 1
        return self.state == STATE_CLOSED
153
154 1
    @property
155
    def opened(self):
156 1
        return self.state == STATE_OPEN
157
158 1
    @property
159
    def name(self):
160 1
        return self._name
161
162 1
    @property
163
    def last_failure(self):
164 1
        return self._last_failure
165
166 1
    @property
167
    def fallback_function(self):
168 1
        return self._fallback_function
169
170 1
    def __str__(self, *args, **kwargs):
171 1
        return self._name
172
173
174 1
class CircuitBreakerError(Exception):
175 1
    def __init__(self, circuit_breaker, *args, **kwargs):
176
        """
177
        :param circuit_breaker:
178
        :param args:
179
        :param kwargs:
180
        :return:
181
        """
182 1
        super(CircuitBreakerError, self).__init__(*args, **kwargs)
183 1
        self._circuit_breaker = circuit_breaker
184
185 1
    def __str__(self, *args, **kwargs):
186 1
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining) (last_failure: %r)' % (
187
            self._circuit_breaker.name,
188
            self._circuit_breaker.open_until,
189
            self._circuit_breaker.failure_count,
190
            round(self._circuit_breaker.open_remaining),
191
            self._circuit_breaker.last_failure,
192
        )
193
194
195 1
class CircuitBreakerMonitor(object):
196 1
    circuit_breakers = {}
197
198 1
    @classmethod
199
    def register(cls, circuit_breaker):
200 1
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
201
202 1
    @classmethod
203
    def all_closed(cls):
204
        # type: () -> bool
205 1
        return len(list(cls.get_open())) == 0
206
207 1
    @classmethod
208
    def get_circuits(cls):
209
        # type: () -> Iterable[CircuitBreaker]
210 1
        return cls.circuit_breakers.values()
211
212 1
    @classmethod
213
    def get(cls, name):
214
        # type: (AnyStr) -> CircuitBreaker
215 1
        return cls.circuit_breakers.get(name)
216
217 1
    @classmethod
218
    def get_open(cls):
219
        # type: () -> Iterable[CircuitBreaker]
220 1
        for circuit in cls.get_circuits():
221 1
            if circuit.opened:
222 1
                yield circuit
223
224 1
    @classmethod
225
    def get_closed(cls):
226
        # type: () -> Iterable[CircuitBreaker]
227 1
        for circuit in cls.get_circuits():
228 1
            if circuit.closed:
229 1
                yield circuit
230
231
232 1
def circuit(failure_threshold=None,
233
            recovery_timeout=None,
234
            expected_exception=None,
235
            name=None,
236
            fallback_function=None,
237
            cls=CircuitBreaker):
238
239
    # if the decorator is used without parameters, the
240
    # wrapped function is provided as first argument
241 1
    if callable(failure_threshold):
242 1
        return cls().decorate(failure_threshold)
243
    else:
244 1
        return cls(
245
            failure_threshold=failure_threshold,
246
            recovery_timeout=recovery_timeout,
247
            expected_exception=expected_exception,
248
            name=name,
249
            fallback_function=fallback_function)
250