Test Failed
Branch develop (2ccc44)
by Fabian
02:05
created

circuitbreaker.CircuitBreakerMonitor.get_closed()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
ccs 4
cts 4
cp 1
crap 3
1
# -*- coding: utf-8 -*-
2 1
from __future__ import unicode_literals
3 1
from __future__ import division
4 1
from __future__ import print_function
5 1
from __future__ import absolute_import
6
7 1
from functools import wraps
8 1
from datetime import datetime, timedelta
9 1
from typing import AnyStr, Iterable
10
11 1
STATE_CLOSED = 'closed'
12 1
STATE_OPEN = 'open'
13 1
STATE_HALF_OPEN = 'half_open'
14
15
16 1
class CircuitBreaker(object):
17 1
    FAILURE_THRESHOLD = 5
18 1
    RECOVERY_TIMEOUT = 30
19 1
    EXPECTED_EXCEPTION = Exception
20
21 1
    def __init__(self,
22
                 failure_threshold=None,
23
                 recovery_timeout=None,
24
                 expected_exception=None,
25
                 name=None):
26 1
        self._failure_count = 0
27 1
        self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
28 1
        self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
29 1
        self._expected_exception = expected_exception or self.EXPECTED_EXCEPTION
30 1
        self._name = name
31 1
        self._state = STATE_CLOSED
32 1
        self._opened = datetime.utcnow()
33
34 1
    def __call__(self, wrapped):
35 1
        return self.decorate(wrapped)
36
37 1
    def decorate(self, function):
38
        """
39
        Applies the circuit breaker to a function
40
        """
41 1
        if self._name is None:
42 1
            self._name = function.__name__
43
44 1
        CircuitBreakerMonitor.register(self)
45
46 1
        @wraps(function)
47
        def wrapper(*args, **kwargs):
48 1
            return self.call(function, *args, **kwargs)
49
50 1
        return wrapper
51
52 1
    def call(self, func, *args, **kwargs):
53
        """
54
        Calls the decorated function and applies the circuit breaker
55
        rules on success or failure
56
        :param func: Decorated function
57
        """
58 1
        if self.opened:
59 1
            raise CircuitBreakerError(self)
60 1
        try:
61 1
            result = func(*args, **kwargs)
62 1
        except self._expected_exception:
63 1
            self.__call_failed()
64 1
            raise
65
66 1
        self.__call_succeeded()
67 1
        return result
68
69 1
    def __call_succeeded(self):
70
        """
71
        Close circuit after successful execution and reset failure count
72
        """
73 1
        self._state = STATE_CLOSED
74 1
        self._failure_count = 0
75
76 1
    def __call_failed(self):
77
        """
78
        Count failure and open circuit, if threshold has been reached
79
        """
80 1
        self._failure_count += 1
81 1
        if self._failure_count >= self._failure_threshold:
82 1
            self._state = STATE_OPEN
83 1
            self._opened = datetime.utcnow()
84
85 1
    @property
86
    def state(self):
87 1
        if self._state == STATE_OPEN and self.open_remaining <= 0:
88 1
            return STATE_HALF_OPEN
89 1
        return self._state
90
91 1
    @property
92
    def open_until(self):
93
        """
94
        The datetime, when the circuit breaker will try to recover
95
        :return: datetime
96
        """
97 1
        return self._opened + timedelta(seconds=self._recovery_timeout)
98
99 1
    @property
100
    def open_remaining(self):
101
        """
102
        Number of seconds remaining, the circuit breaker stays in OPEN state
103
        :return: int
104
        """
105 1
        return (self.open_until - datetime.utcnow()).total_seconds()
106
107 1
    @property
108
    def failure_count(self):
109 1
        return self._failure_count
110
111 1
    @property
112
    def closed(self):
113 1
        return self.state == STATE_CLOSED
114
115 1
    @property
116
    def opened(self):
117 1
        return self.state == STATE_OPEN
118
119 1
    @property
120
    def name(self):
121 1
        return self._name
122
123 1
    def __str__(self, *args, **kwargs):
124 1
        return self._name
125
126
127 1
class CircuitBreakerError(Exception):
128 1
    def __init__(self, circuit_breaker, *args, **kwargs):
129
        """
130
        :param circuit_breaker:
131
        :param args:
132
        :param kwargs:
133
        :return:
134
        """
135 1
        super(CircuitBreakerError, self).__init__(*args, **kwargs)
136 1
        self._circuit_breaker = circuit_breaker
137
138 1
    def __str__(self, *args, **kwargs):
139 1
        return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining)' % (
140
            self._circuit_breaker.name,
141
            self._circuit_breaker.open_until,
142
            self._circuit_breaker.failure_count,
143
            round(self._circuit_breaker.open_remaining)
144
        )
145
146
147 1
class CircuitBreakerMonitor(object):
148 1
    circuit_breakers = {}
149
150 1
    @classmethod
151
    def register(cls, circuit_breaker):
152 1
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
153
154 1
    @classmethod
155
    def all_closed(cls):
156
        # type: () -> bool
157 1
        return len(list(cls.get_open())) == 0
158
159 1
    @classmethod
160
    def get_circuits(cls):
161
        # type: () -> Iterable[CircuitBreaker]
162 1
        return cls.circuit_breakers.values()
163
164 1
    @classmethod
165
    def get(cls, name):
166
        # type: (AnyStr) -> CircuitBreaker
167 1
        return cls.circuit_breakers.get(name)
168
169 1
    @classmethod
170
    def get_open(cls):
171
        # type: () -> Iterable[CircuitBreaker]
172 1
        for circuit in cls.get_circuits():
173 1
            if circuit.opened:
174 1
                yield circuit
175
176 1
    @classmethod
177
    def get_closed(cls):
178
        # type: () -> Iterable[CircuitBreaker]
179 1
        for circuit in cls.get_circuits():
180 1
            if circuit.closed:
181 1
                yield circuit
182
183
184 1
def circuit(failure_threshold=None,
185
            recovery_timeout=None,
186
            expected_exception=None,
187
            name=None,
188
            cls=CircuitBreaker):
189
190
    # if the decorator is used without parameters, the
191
    # wrapped function is provided as first argument
192 1
    if callable(failure_threshold):
193 1
        return cls().decorate(failure_threshold)
194
    else:
195 1
        return cls(
196
            failure_threshold=failure_threshold,
197
            recovery_timeout=recovery_timeout,
198
            expected_exception=expected_exception,
199
            name=name)
200