Passed
Pull Request — master (#12)
by Fabian
14:36 queued 07:07
created

circuitbreaker.CircuitBreaker.fallback_function()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 1
from __future__ import unicode_literals
3 1
from __future__ import division
4 1
from __future__ import print_function
5 1
from __future__ import absolute_import
6
7 1
from functools import wraps
8 1
from datetime import datetime, timedelta
9 1
from typing import AnyStr, Iterable
10
11 1
STATE_CLOSED = 'closed'
12 1
STATE_OPEN = 'open'
13 1
STATE_HALF_OPEN = 'half_open'
14
15
16 1
class CircuitBreaker(object):
17 1
    FAILURE_THRESHOLD = 5
18 1
    RECOVERY_TIMEOUT = 30
19 1
    EXPECTED_EXCEPTION = Exception
20 1
    FALLBACK_FUNCTION = None
21
22 1
    def __init__(self,
23
                 failure_threshold=None,
24
                 recovery_timeout=None,
25
                 expected_exception=None,
26
                 name=None,
27
                 fallback_function=None):
28 1
        self._last_failure = None
29 1
        self._failure_count = 0
30 1
        self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
31 1
        self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
32 1
        self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION
33 1
        self._fallback_function = fallback_function or self.FALLBACK_FUNCTION
34 1
        self._name = name
35 1
        self._state = STATE_CLOSED
36 1
        self._opened = datetime.utcnow()
37
38 1
    def __call__(self, wrapped):
39 1
        return self.decorate(wrapped)
40
41 1
    def decorate(self, function):
42
        """
43
        Applies the circuit breaker to a function
44
        """
45 1
        if self._name is None:
46 1
            self._name = function.__name__
47
48 1
        CircuitBreakerMonitor.register(self)
49
50 1
        @wraps(function)
51
        def wrapper(*args, **kwargs):
52 1
            return self.call(function, *args, **kwargs)
53
54 1
        return wrapper
55
56 1
    def call(self, func, *args, **kwargs):
57
        """
58
        Calls the decorated function and applies the circuit breaker
59
        rules on success or failure
60
        :param func: Decorated function
61
        """
62 1
        if self.opened:
63 1
            if self.fallback_function:
64 1
                self.fallback_function(*args, **kwargs)
65
            else:
66 1
                raise CircuitBreakerError(self)
67 1
        try:
68 1
            result = func(*args, **kwargs)
69 1
        except self._expected_exception as e:
70 1
            self._last_failure = e
71 1
            self.__call_failed()
72 1
            raise
73
74 1
        self.__call_succeeded()
75 1
        return result
76
77 1
    def __call_succeeded(self):
78
        """
79
        Close circuit after successful execution and reset failure count
80
        """
81 1
        self._state = STATE_CLOSED
82 1
        self._last_failure = None
83 1
        self._failure_count = 0
84
85 1
    def __call_failed(self):
86
        """
87
        Count failure and open circuit, if threshold has been reached
88
        """
89 1
        self._failure_count += 1
90 1
        if self._failure_count >= self._failure_threshold:
91 1
            self._state = STATE_OPEN
92 1
            self._opened = datetime.utcnow()
93
94 1
    @property
95
    def state(self):
96 1
        if self._state == STATE_OPEN and self.open_remaining <= 0:
97 1
            return STATE_HALF_OPEN
98 1
        return self._state
99
100 1
    @property
101
    def open_until(self):
102
        """
103
        The datetime, when the circuit breaker will try to recover
104
        :return: datetime
105
        """
106 1
        return self._opened + timedelta(seconds=self._recovery_timeout)
107
108 1
    @property
109
    def open_remaining(self):
110
        """
111
        Number of seconds remaining, the circuit breaker stays in OPEN state
112
        :return: int
113
        """
114 1
        return (self.open_until - datetime.utcnow()).total_seconds()
115
116 1
    @property
117
    def failure_count(self):
118 1
        return self._failure_count
119
120 1
    @property
121
    def closed(self):
122 1
        return self.state == STATE_CLOSED
123
124 1
    @property
125
    def opened(self):
126 1
        return self.state == STATE_OPEN
127
128 1
    @property
129
    def name(self):
130 1
        return self._name
131
132 1
    @property
133
    def last_failure(self):
134 1
        return self._last_failure
135
136 1
    @property
137
    def fallback_function(self):
138 1
        return self._fallback_function
139
140 1
    def __str__(self, *args, **kwargs):
141 1
        return self._name
142
143
144 1
class CircuitBreakerError(Exception):
145 1
    def __init__(self, circuit_breaker, *args, **kwargs):
146
        """
147
        :param circuit_breaker:
148
        :param args:
149
        :param kwargs:
150
        :return:
151
        """
152 1
        super(CircuitBreakerError, self).__init__(*args, **kwargs)
153 1
        self._circuit_breaker = circuit_breaker
154
155 1
    def __str__(self, *args, **kwargs):
156 1
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining) (last_failure: %r)' % (
157
            self._circuit_breaker.name,
158
            self._circuit_breaker.open_until,
159
            self._circuit_breaker.failure_count,
160
            round(self._circuit_breaker.open_remaining),
161
            self._circuit_breaker.last_failure,
162
        )
163
164
165 1
class CircuitBreakerMonitor(object):
166 1
    circuit_breakers = {}
167
168 1
    @classmethod
169
    def register(cls, circuit_breaker):
170 1
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
171
172 1
    @classmethod
173
    def all_closed(cls):
174
        # type: () -> bool
175 1
        return len(list(cls.get_open())) == 0
176
177 1
    @classmethod
178
    def get_circuits(cls):
179
        # type: () -> Iterable[CircuitBreaker]
180 1
        return cls.circuit_breakers.values()
181
182 1
    @classmethod
183
    def get(cls, name):
184
        # type: (AnyStr) -> CircuitBreaker
185 1
        return cls.circuit_breakers.get(name)
186
187 1
    @classmethod
188
    def get_open(cls):
189
        # type: () -> Iterable[CircuitBreaker]
190 1
        for circuit in cls.get_circuits():
191 1
            if circuit.opened:
192 1
                yield circuit
193
194 1
    @classmethod
195
    def get_closed(cls):
196
        # type: () -> Iterable[CircuitBreaker]
197 1
        for circuit in cls.get_circuits():
198 1
            if circuit.closed:
199 1
                yield circuit
200
201
202 1
def circuit(failure_threshold=None,
203
            recovery_timeout=None,
204
            expected_exception=None,
205
            name=None,
206
            fallback_function=None,
207
            cls=CircuitBreaker):
208
209
    # if the decorator is used without parameters, the
210
    # wrapped function is provided as first argument
211 1
    if callable(failure_threshold):
212 1
        return cls().decorate(failure_threshold)
213
    else:
214 1
        return cls(
215
            failure_threshold=failure_threshold,
216
            recovery_timeout=recovery_timeout,
217
            expected_exception=expected_exception,
218
            name=name,
219
            fallback_function=fallback_function)
220