Completed
Push — master ( a8e749...c776c0 )
by Fabian
42s
created

CircuitBreakerMonitor   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 32
Duplicated Lines 0 %
Metric Value
dl 0
loc 32
rs 10
wmc 11

6 Methods

Rating   Name   Duplication   Size   Complexity  
A get_circuits() 0 3 1
A get_open() 0 5 3
A register() 0 3 1
A get_closed() 0 5 3
A all_closed() 0 5 2
A get() 0 3 1
1
from functools import wraps
2
from datetime import datetime, timedelta
3
4
STATE_CLOSED = 'closed'
5
STATE_OPEN = 'open'
6
STATE_HALF_OPEN = 'half_open'
7
8
9
class CircuitBreaker(object):
10
    def __init__(self, expected_exception=Exception, failure_threshold=5, recover_timeout=30, name=None):
11
        self._expected_exception = expected_exception
12
        self._failure_count = 0
13
        self._failure_threshold = failure_threshold
14
        self._recover_timeout = recover_timeout
15
        self._state = STATE_CLOSED
16
        self._opened = datetime.utcnow()
17
        self._name = name
18
19
    def __call__(self, wrapped):
20
        """
21
        Applies the circuit breaker decorator to a function
22
        """
23
        if self._name is None:
24
            self._name = wrapped.__name__
25
26
        CircuitBreakerMonitor.register(self)
27
28
        @wraps(wrapped)
29
        def wrapper(*args, **kwargs):
30
            return self.call(wrapped, *args, **kwargs)
31
32
        return wrapper
33
34
    def call(self, func, *args, **kwargs):
35
        """
36
        Calls the decorated function and applies the circuit breaker rules on success or failure
37
        :param func: Decorated function
38
        """
39
        if not self.__is_closed():
40
            raise CircuitBreakerError(self)
41
        try:
42
            result = func(*args, **kwargs)
43
        except self._expected_exception:
44
            self.__failure()
45
            raise
46
47
        self.__success()
48
        return result
49
50
    def __is_closed(self):
51
        """
52
        Check if state is CLOSED
53
        Set state to HALF_OPEN and allow the next execution, if recovery timeout has been reached
54
        """
55
        if self._state == STATE_OPEN and self.open_remaining <= 0:
56
            self._state = STATE_HALF_OPEN
57
            return True
58
59
        return self._state == STATE_CLOSED
60
61
    def __success(self):
62
        """
63
        Close circuit after successful execution
64
        """
65
        self.close()
66
67
    def __failure(self):
68
        """
69
        Count failure and open circuit, if threshold has been reached
70
        """
71
        self._failure_count += 1
72
        if self._failure_count >= self._failure_threshold:
73
            self.open()
74
75
    def open(self):
76
        """
77
        Open the circuit breaker
78
        """
79
        self._state = STATE_OPEN
80
        self._opened = datetime.utcnow()
81
82
    def close(self):
83
        """
84
        Close the circuit breaker
85
        """
86
        self._state = STATE_CLOSED
87
        self._failure_count = 0
88
89
    @property
90
    def open_until(self):
91
        """
92
        The datetime, when the circuit breaker will try to recover
93
        :return: datetime
94
        """
95
        return self._opened + timedelta(seconds=self._recover_timeout)
96
97
    @property
98
    def open_remaining(self):
99
        """
100
        Number of seconds (int) remaining, the circuit breaker stays in OPEN state
101
        :return: int
102
        """
103
        return (self.open_until - datetime.utcnow()).total_seconds()
104
105
    @property
106
    def failure_count(self):
107
        return self._failure_count
108
109
    @property
110
    def closed(self):
111
        return self._state is not STATE_OPEN
112
113
    @property
114
    def name(self):
115
        return self._name
116
117
    @property
118
    def state(self):
119
        return self._state
120
121
    def __str__(self, *args, **kwargs):
122
        return self._name
123
124
125
class CircuitBreakerError(Exception):
126
    def __init__(self, circuit_breaker, *args, **kwargs):
127
        """
128
        :param circuit_breaker:
129
        :param args:
130
        :param kwargs:
131
        :return:
132
        """
133
        super().__init__(*args, **kwargs)
134
        self._circuit_breaker = circuit_breaker
135
136
    def __str__(self, *args, **kwargs):
137
        return 'CIRCUIT "%s" OPEN until %s (%d failures, %d sec remaining)' % (
138
            self._circuit_breaker.name,
139
            self._circuit_breaker.open_until,
140
            self._circuit_breaker.failure_count,
141
            round(self._circuit_breaker.open_remaining)
142
        )
143
144
145
class CircuitBreakerMonitor(object):
146
    circuit_breakers = {}
147
148
    @classmethod
149
    def register(cls, circuit_breaker):
150
        cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
151
152
    @classmethod
153
    def all_closed(cls):
154
        if list(cls.get_open()):
155
            return False
156
        return True
157
158
    @classmethod
159
    def get_circuits(cls):
160
        return cls.circuit_breakers.values()
161
162
    @classmethod
163
    def get(cls, name):
164
        return cls.circuit_breakers.get(name)
165
166
    @classmethod
167
    def get_open(cls):
168
        for circuit in cls.get_circuits():
169
            if circuit.state is STATE_OPEN:
170
                yield circuit
171
172
    @classmethod
173
    def get_closed(cls):
174
        for circuit in cls.get_circuits():
175
            if circuit.state is not STATE_OPEN:
176
                yield circuit
177