Passed
Pull Request — develop (#7)
by
unknown
08:02
created

circuitbreaker.CircuitBreaker.decorate()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 2
dl 0
loc 14
ccs 10
cts 10
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 1
from __future__ import unicode_literals
3 1
from __future__ import division
4 1
from __future__ import print_function
5 1
from __future__ import absolute_import
6
7 1
from functools import wraps
8 1
from datetime import datetime
9 1
from typing import AnyStr, Iterable
10
11 1
import ctypes
12 1
import multiprocessing
13 1
14
from circuitbreaker.stats import (
15
    record_circuit_breaker_state, record_circuit_breaker_success_total,
16 1
    record_circuit_breaker_failure_total)
17 1
18 1
STATE_CLOSED = b'closed'
19 1
STATE_OPEN = b'open'
20
STATE_HALF_OPEN = b'half-open'
21 1
22
EPOCH = datetime.utcfromtimestamp(0)
23
24
25
def unix_time_seconds(dt):
26 1
    return (dt - EPOCH).total_seconds()
27 1
28 1
29 1
class CircuitBreaker(object):
30 1
    FAILURE_THRESHOLD = 5
31 1
    RECOVERY_TIMEOUT = 30
32 1
    EXPECTED_EXCEPTION = Exception
33 1
34
    def __init__(self,
35 1
                 failure_threshold=None,
36 1
                 recovery_timeout=None,
37
                 expected_exception=None,
38 1
                 name=None):
39
        """
40
        
41
        :param failure_threshold: The minimum number of failures before opening circuit
42 1
        :param recovery_timeout: The number of seconds to elapse before circuit 
43 1
                                 can be considered in HALF_OPEN state
44
        :param expected_exception: Any exception expected from the external network call
45 1
        :param name: The name of the circuit breaker
46
        """
47 1
        self._lock = multiprocessing.RLock()
48
        self._failure_count = multiprocessing.Value(ctypes.c_int, 0, lock=self._lock)
49 1
        self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
50
        self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
51 1
        self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION
52
        self._name = name
53 1
        self._state = multiprocessing.Value(ctypes.c_char_p, STATE_CLOSED, lock=self._lock)
54
        self._opened = multiprocessing.Value(ctypes.c_double, unix_time_seconds(datetime.utcnow()), lock=self._lock)
55
56
57
    def __call__(self, wrapped):
58
        return self.decorate(wrapped)
59 1
60 1
    def decorate(self, function):
61 1
        """
62 1
        Applies the circuit breaker to a function
63 1
        """
64 1
        if self._name is None:
65 1
            self._name = function.__name__
66 1
67
        CircuitBreakerMonitor.register(self)
68 1
69 1
        @wraps(function)
70
        def wrapper(*args, **kwargs):
71 1
            return self.call(function, *args, **kwargs)
72
73
        return wrapper
74
75 1
    def call(self, func, *args, **kwargs):
76 1
        """
77 1
        Calls the decorated function and applies the circuit breaker
78
        rules on success or failure
79 1
        :param func: Decorated function
80
        """
81
        try:
82
            if self.opened:
83 1
                record_circuit_breaker_failure_total(self._name, self._state.value)
84 1
                raise CircuitBreakerError(self)
85 1
            try:
86 1
                result = func(*args, **kwargs)
87
            except self._expected_exception:
88 1
                self.__call_failed()
89
                raise
90 1
91 1
            self.__call_succeeded()
92 1
            return result
93
        finally:
94 1
            # Always record the last known state of the circuit breaker
95
            record_circuit_breaker_state(self._name, self.state)
96
97
    def __call_succeeded(self):
98
        """
99
        Close circuit after successful execution and reset failure count
100 1
        """
101
        # Record the state of circuit breaker when a successful remote call
102 1
        # took place
103
        record_circuit_breaker_success_total(self._name, self._state.value)
104
105
        with self._lock:
106
            self._state.value = STATE_CLOSED
107
            self._failure_count.value = 0
108 1
109
    def __call_failed(self):
110 1
        """
111
        Count failure and open circuit, if threshold has been reached
112 1
        """
113
        # Record the state of circuit breaker when an unsuccessful remote call
114 1
        # took place
115
        record_circuit_breaker_failure_total(self._name, self._state.value)
116 1
117
        with self._lock:
118 1
            self._failure_count.value += 1
119
            if self._failure_count.value >= self._failure_threshold:
120 1
                    self._state.value = STATE_OPEN
121
                    self._opened.value = unix_time_seconds(datetime.utcnow())
122 1
123
    @property
124 1
    def state(self):
125
        if self._state.value == STATE_OPEN and self.open_remaining <= 0:
126 1
            return STATE_HALF_OPEN
127
128 1
        return self._state.value
129
130 1
    @property
131 1
    def open_until(self):
132
        """
133
        The epoch of when the circuit breaker will try to recover
134 1
        :return: epoch float
135 1
        """
136
        return self._opened.value + self._recovery_timeout
137
138
    @property
139
    def open_remaining(self):
140
        """
141
        Number of seconds remaining, the circuit breaker stays in OPEN state
142 1
        :return: float
143 1
        """
144
        return self.open_until - unix_time_seconds(datetime.utcnow())
145 1
146 1
    @property
147
    def failure_count(self):
148
        return self._failure_count.value
149
150
    @property
151
    def closed(self):
152
        return self.state == STATE_CLOSED
153
154
    @property
155 1
    def opened(self):
156 1
        return self.state == STATE_OPEN
157
158 1
    @property
159
    def name(self):
160 1
        return self._name
161
162 1
    def __str__(self, *args, **kwargs):
163
        return self._name
164
165 1
166
class CircuitBreakerError(Exception):
167 1
    def __init__(self, circuit_breaker, *args, **kwargs):
168
        """
169
        :param circuit_breaker:
170 1
        :param args:
171
        :param kwargs:
172 1
        :return:
173
        """
174
        super(CircuitBreakerError, self).__init__(*args, **kwargs)
175 1
        self._circuit_breaker = circuit_breaker
176
177 1
    def __str__(self, *args, **kwargs):
178
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining)' % (
179
            self._circuit_breaker.name,
180 1
            self._circuit_breaker.open_until,
181 1
            self._circuit_breaker.failure_count,
182 1
            round(self._circuit_breaker.open_remaining)
183
        )
184 1
185
186
class CircuitBreakerMonitor(object):
187 1
    circuit_breakers = {}
188 1
189 1
    @classmethod
190
    def register(cls, circuit_breaker):
191
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
192 1
193
    @classmethod
194
    def all_closed(cls):
195
        # type: () -> bool
196
        return len(list(cls.get_open())) == 0
197
198
    @classmethod
199
    def get_circuits(cls):
200 1
        # type: () -> Iterable[CircuitBreaker]
201 1
        return cls.circuit_breakers.values()
202
203 1
    @classmethod
204
    def get(cls, name):
205
        # type: (AnyStr) -> CircuitBreaker
206
        return cls.circuit_breakers.get(name)
207
208
    @classmethod
209
    def get_open(cls):
210
        # type: () -> Iterable[CircuitBreaker]
211
        for circuit in cls.get_circuits():
212
            if circuit.opened:
213
                yield circuit
214
215
    @classmethod
216
    def get_closed(cls):
217
        # type: () -> Iterable[CircuitBreaker]
218
        for circuit in cls.get_circuits():
219
            if circuit.closed:
220
                yield circuit
221
222
223
def circuit(failure_threshold=None,
224
            recovery_timeout=None,
225
            expected_exception=None,
226
            name=None,
227
            cls=CircuitBreaker):
228
229
    # if the decorator is used without parameters, the
230
    # wrapped function is provided as first argument
231
    if callable(failure_threshold):
232
        return cls().decorate(failure_threshold)
233
    else:
234
        return cls(
235
            failure_threshold=failure_threshold,
236
            recovery_timeout=recovery_timeout,
237
            expected_exception=expected_exception,
238
            name=name)
239