Passed
Pull Request — develop (#16)
by
unknown
07:06
created

circuitbreaker.CircuitBreaker.call_generator()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 4
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 1
from __future__ import unicode_literals
3 1
from __future__ import division
4 1
from __future__ import print_function
5 1
from __future__ import absolute_import
6
7 1
from functools import wraps
8 1
from datetime import datetime, timedelta
9 1
from inspect import iscoroutinefunction, isgeneratorfunction
10
from typing import AnyStr, Iterable
11 1
12 1
STATE_CLOSED = 'closed'
13 1
STATE_OPEN = 'open'
14
STATE_HALF_OPEN = 'half_open'
15
16 1
17 1
class CircuitBreaker(object):
18 1
    FAILURE_THRESHOLD = 5
19 1
    RECOVERY_TIMEOUT = 30
20 1
    EXPECTED_EXCEPTION = Exception
21
    FALLBACK_FUNCTION = None
22 1
23
    def __init__(self,
24
                 failure_threshold=None,
25
                 recovery_timeout=None,
26
                 expected_exception=None,
27
                 name=None,
28 1
                 fallback_function=None):
29 1
        self._last_failure = None
30 1
        self._failure_count = 0
31 1
        self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
32 1
        self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
33 1
        self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION
34 1
        self._fallback_function = fallback_function or self.FALLBACK_FUNCTION
35 1
        self._name = name
36 1
        self._state = STATE_CLOSED
37
        self._opened = datetime.utcnow()
38 1
39 1
    def __call__(self, wrapped):
40
        return self.decorate(wrapped)
41 1
42
    def __enter__(self):
43
        return None
44
45 1
    def __exit__(self, exc_type, exc_value, traceback):
46 1
        if exc_type and issubclass(exc_type, self._expected_exception):
47
            # exception was raised and is our concern
48 1
            self._last_failure = exc_value
49
            self.__call_failed()
50 1
        else:
51
            self.__call_succeeded()
52 1
        return False  # return False to raise exception if any
53
54 1
    def decorate(self, function):
55
        """
56 1
        Applies the circuit breaker to a function
57
        """
58
        if self._name is None:
59
            self._name = function.__name__
60
61
        CircuitBreakerMonitor.register(self)
62 1
63 1
        if iscoroutinefunction(function):
64 1
            call = self.call_async
65 1
        elif isgeneratorfunction(function):
66 1
            call = self.call_generator
67 1
        else:
68 1
            call = self.call
69 1
70 1
        @wraps(function)
71 1
        def wrapper(*args, **kwargs):
72
            if self.opened:
73 1
                if self.fallback_function:
74 1
                    return self.fallback_function(*args, **kwargs)
75
                raise CircuitBreakerError(self)
76 1
            return call(function, *args, **kwargs)
77
        return wrapper
78
79
    def call(self, func, *args, **kwargs):
80 1
        """
81 1
        Calls the decorated function and applies the circuit breaker
82 1
        rules on success or failure
83
        :param func: Decorated function
84 1
        """
85
        with self:
86
            return func(*args, **kwargs)
87
88 1
    def call_generator(self, func, *args, **kwargs):
89 1
        """
90 1
        Calls the decorated generator function and applies the circuit breaker
91 1
        rules on success or failure
92
        :param func: Decorated genrator function
93 1
        """
94
        with self:
95 1
            return (yield from func(*args, **kwargs))
96 1
97 1
    async def call_async(self, func, *args, **kwargs):
98
        """
99 1
        Calls the decorated async function and applies the circuit breaker
100
        rules on success or failure
101
        :param func: Decorated async function
102
        """
103
        with self:
104
            return await func(*args, **kwargs)
105 1
106
    def __call_succeeded(self):
107 1
        """
108
        Close circuit after successful execution and reset failure count
109
        """
110
        self._state = STATE_CLOSED
111
        self._last_failure = None
112
        self._failure_count = 0
113 1
114
    def __call_failed(self):
115 1
        """
116
        Count failure and open circuit, if threshold has been reached
117 1
        """
118
        self._failure_count += 1
119 1
        if self._failure_count >= self._failure_threshold:
120
            self._state = STATE_OPEN
121 1
            self._opened = datetime.utcnow()
122
123 1
    @property
124
    def state(self):
125 1
        if self._state == STATE_OPEN and self.open_remaining <= 0:
126
            return STATE_HALF_OPEN
127 1
        return self._state
128
129 1
    @property
130
    def open_until(self):
131 1
        """
132
        The datetime, when the circuit breaker will try to recover
133 1
        :return: datetime
134
        """
135 1
        return self._opened + timedelta(seconds=self._recovery_timeout)
136
137 1
    @property
138
    def open_remaining(self):
139 1
        """
140 1
        Number of seconds remaining, the circuit breaker stays in OPEN state
141
        :return: int
142
        """
143 1
        return (self.open_until - datetime.utcnow()).total_seconds()
144 1
145
    @property
146
    def failure_count(self):
147
        return self._failure_count
148
149
    @property
150
    def closed(self):
151 1
        return self.state == STATE_CLOSED
152 1
153
    @property
154 1
    def opened(self):
155 1
        return self.state == STATE_OPEN
156
157
    @property
158
    def name(self):
159
        return self._name
160
161
    @property
162
    def last_failure(self):
163
        return self._last_failure
164 1
165 1
    @property
166
    def fallback_function(self):
167 1
        return self._fallback_function
168
169 1
    def __str__(self, *args, **kwargs):
170
        return self._name
171 1
172
173
class CircuitBreakerError(Exception):
174 1
    def __init__(self, circuit_breaker, *args, **kwargs):
175
        """
176 1
        :param circuit_breaker:
177
        :param args:
178
        :param kwargs:
179 1
        :return:
180
        """
181 1
        super(CircuitBreakerError, self).__init__(*args, **kwargs)
182
        self._circuit_breaker = circuit_breaker
183
184 1
    def __str__(self, *args, **kwargs):
185
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining) (last_failure: %r)' % (
186 1
            self._circuit_breaker.name,
187
            self._circuit_breaker.open_until,
188
            self._circuit_breaker.failure_count,
189 1
            round(self._circuit_breaker.open_remaining),
190 1
            self._circuit_breaker.last_failure,
191 1
        )
192
193 1
194
class CircuitBreakerMonitor(object):
195
    circuit_breakers = {}
196 1
197 1
    @classmethod
198 1
    def register(cls, circuit_breaker):
199
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
200
201 1
    @classmethod
202
    def all_closed(cls):
203
        # type: () -> bool
204
        return len(list(cls.get_open())) == 0
205
206
    @classmethod
207
    def get_circuits(cls):
208
        # type: () -> Iterable[CircuitBreaker]
209
        return cls.circuit_breakers.values()
210 1
211 1
    @classmethod
212
    def get(cls, name):
213 1
        # type: (AnyStr) -> CircuitBreaker
214
        return cls.circuit_breakers.get(name)
215
216
    @classmethod
217
    def get_open(cls):
218
        # type: () -> Iterable[CircuitBreaker]
219
        for circuit in cls.get_circuits():
220
            if circuit.opened:
221
                yield circuit
222
223
    @classmethod
224
    def get_closed(cls):
225
        # type: () -> Iterable[CircuitBreaker]
226
        for circuit in cls.get_circuits():
227
            if circuit.closed:
228
                yield circuit
229
230
231
def circuit(failure_threshold=None,
232
            recovery_timeout=None,
233
            expected_exception=None,
234
            name=None,
235
            fallback_function=None,
236
            cls=CircuitBreaker):
237
238
    # if the decorator is used without parameters, the
239
    # wrapped function is provided as first argument
240
    if callable(failure_threshold):
241
        return cls().decorate(failure_threshold)
242
    else:
243
        return cls(
244
            failure_threshold=failure_threshold,
245
            recovery_timeout=recovery_timeout,
246
            expected_exception=expected_exception,
247
            name=name,
248
            fallback_function=fallback_function)
249