Completed
Push — master ( 874bf3...20838a )
by Ionel Cristian
54s
created

src.redis_lock.Lock.extend()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 13
rs 9.4286
1
import threading
2
from logging import getLogger
3
from os import urandom
4
from hashlib import sha1
5
6
from redis import StrictRedis
7
from redis.exceptions import NoScriptError
8
9
__version__ = "2.3.0"
10
11
logger = getLogger(__name__)
12
13
UNLOCK_SCRIPT = b"""
14
    if redis.call("get", KEYS[1]) == ARGV[1] then
15
        redis.call("del", KEYS[2])
16
        redis.call("lpush", KEYS[2], 1)
17
        return redis.call("del", KEYS[1])
18
    else
19
        return 0
20
    end
21
"""
22
UNLOCK_SCRIPT_HASH = sha1(UNLOCK_SCRIPT).hexdigest()
23
24
25
class AlreadyAcquired(RuntimeError):
26
    pass
27
28
29
class NotAcquired(RuntimeError):
30
    pass
31
32
33
class AlreadyStarted(RuntimeError):
34
    pass
35
36
37
class TimeoutNotUsable(RuntimeError):
38
    pass
39
40
41
class InvalidTimeout(RuntimeError):
42
    pass
43
44
45
class TimeoutTooLarge(RuntimeError):
46
    pass
47
48
49
class NotExpirable(RuntimeError):
50
    pass
51
52
53
class Lock(object):
54
    """
55
    A Lock context manager implemented via redis SETNX/BLPOP.
56
    """
57
58
    def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False):
59
        """
60
        :param redis_client:
61
            An instance of :class:`~StrictRedis`.
62
        :param name:
63
            The name (redis key) the lock should have.
64
        :param expire:
65
            The lock expiry time in seconds. If left at the default (None)
66
            the lock will not expire.
67
        :param id:
68
            The ID (redis value) the lock should have. A random value is
69
            generated when left at the default.
70
        :param auto_renewal:
71
            If set to True, Lock will automatically renew the lock so that it
72
            doesn't expire for as long as the lock is held (acquire() called
73
            or running in a context manager).
74
75
            Implementation note: Renewal will happen using a daemon thread with
76
            an interval of expire*2/3. If wishing to use a different renewal
77
            time, subclass Lock, call super().__init__() then set
78
            self._lock_renewal_interval to your desired interval.
79
        """
80
        assert isinstance(redis_client, StrictRedis)
81
        if auto_renewal and expire is None:
82
            raise ValueError("Expire may not be None when auto_renewal is set")
83
84
        self._client = redis_client
85
        self._expire = expire if expire is None else int(expire)
86
        self._id = urandom(16) if id is None else id
87
        self._held = False
88
        self._name = 'lock:'+name
89
        self._signal = 'lock-signal:'+name
90
        self._lock_renewal_interval = expire*2/3 if auto_renewal else None
91
        self._lock_renewal_thread = None
92
93
    def reset(self):
94
        """
95
        Forcibly deletes the lock. Use this with care.
96
        """
97
        self._client.delete(self._name)
98
        self._client.delete(self._signal)
99
100
    @property
101
    def id(self):
102
        return self._id
103
104
    def get_owner_id(self):
105
        return self._client.get(self._name)
106
107
    def acquire(self, blocking=True, timeout=None):
108
        """
109
        :param blocking:
110
            Boolean value specifying whether lock should be blocking or not.
111
        :param timeout:
112
            An integer value specifying the maximum number of seconds to block.
113
        """
114
        logger.debug("Getting %r ...", self._name)
115
116
        if self._held:
117
            raise AlreadyAcquired("Already acquired from this Lock instance.")
118
119
        if not blocking and timeout is not None:
120
            raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
121
122
        timeout = timeout if timeout is None else int(timeout)
123
        if timeout is not None and timeout <= 0:
124
            raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
125
126
        if timeout and self._expire and timeout > self._expire:
127
            raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
128
129
        busy = True
130
        blpop_timeout = timeout or self._expire or 0
131
        timed_out = False
132
        while busy:
133
            busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
134
            if busy:
135
                if timed_out:
136
                    return False
137
                elif blocking:
138
                    timed_out = not self._client.blpop(self._signal, blpop_timeout)
139
                else:
140
                    logger.debug("Failed to get %r.", self._name)
141
                    return False
142
143
        logger.debug("Got lock for %r.", self._name)
144
        self._held = True
145
        if self._lock_renewal_interval is not None:
146
            self._start_lock_renewer()
147
        return True
148
149
    def extend(self, expire=None):
150
        """Extends expiration time of the lock.
151
152
        :param expire:
153
            New expiration time. If ``None`` - `expire` provided during
154
            lock initialization will be taken.
155
        """
156
        if self._expire is None:
157
            raise NotExpirable('The lock has no expiry time, so extending it '
158
                               'makes no sense.')
159
        if expire is None:
160
            expire = self._expire
161
        self._client.set(self._name, self._id, xx=True, ex=expire)
162
163
    def _lock_renewer(self, interval):
164
        """
165
        Renew the lock key in redis every `interval` seconds for as long
166
        as `self._lock_renewal_thread.should_exit` is False.
167
        """
168
        log = getLogger("%s.lock_refresher" % __name__)
169
        while not self._lock_renewal_thread.wait_for_exit_request(timeout=interval):
170
            log.debug("Refreshing lock")
171
            self.extend(expire=self._expire)
172
        log.debug("Exit requested, stopping lock refreshing")
173
174
    def _start_lock_renewer(self):
175
        """
176
        Starts the lock refresher thread.
177
        """
178
        if self._lock_renewal_thread is not None:
179
            raise AlreadyStarted("Lock refresh thread already started")
180
181
        logger.debug(
182
            "Starting thread to refresh lock every %s seconds",
183
            self._lock_renewal_interval
184
        )
185
        self._lock_renewal_thread = InterruptableThread(
186
            group=None,
187
            target=self._lock_renewer,
188
            kwargs={'interval': self._lock_renewal_interval}
189
        )
190
        self._lock_renewal_thread.setDaemon(True)
191
        self._lock_renewal_thread.start()
192
193
    def _stop_lock_renewer(self):
194
        """
195
        Stop the lock renewer.
196
197
        This signals the renewal thread and waits for its exit.
198
        """
199
        if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
200
            return
201
        logger.debug("Signalling the lock refresher to stop")
202
        self._lock_renewal_thread.request_exit()
203
        self._lock_renewal_thread.join()
204
        self._lock_renewal_thread = None
205
        logger.debug("Lock refresher has stopped")
206
207
    def __enter__(self):
208
        acquired = self.acquire(blocking=True)
209
        assert acquired, "Lock wasn't acquired, but blocking=True"
210
        return self
211
212
    def __exit__(self, exc_type=None, exc_value=None, traceback=None, force=False):
213
        if not (self._held or force):
214
            raise NotAcquired("This Lock instance didn't acquire the lock.")
215
        if self._lock_renewal_thread is not None:
216
            self._stop_lock_renewer()
217
        logger.debug("Releasing %r.", self._name)
218
        try:
219
            self._client.evalsha(UNLOCK_SCRIPT_HASH, 2, self._name, self._signal, self._id)
220
        except NoScriptError:
221
            logger.warn("UNLOCK_SCRIPT not cached.")
222
            self._client.eval(UNLOCK_SCRIPT, 2, self._name, self._signal, self._id)
223
        self._held = False
224
225
    def release(self, force=False):
226
        """Releases the lock, that was acquired in the same Python context.
227
228
        :param force:
229
            If ``False`` - fail with exception if this instance was not in
230
            acquired state in the same Python context.
231
            If ``True`` - fail silently.
232
        """
233
        return self.__exit__(force=force)
234
235
236
class InterruptableThread(threading.Thread):
237
    """
238
    A Python thread that can be requested to stop by calling request_exit()
239
    on it.
240
241
    Code running inside this thread should periodically check the
242
    `should_exit` property (or use wait_for_exit_request) on the thread
243
    object and stop further processing once it returns True.
244
    """
245
    def __init__(self, *args, **kwargs):
246
        self._should_exit = threading.Event()
247
        super(InterruptableThread, self).__init__(*args, **kwargs)
248
249
    def request_exit(self):
250
        """
251
        Signal the thread that it should stop performing more work and exit.
252
        """
253
        self._should_exit.set()
254
255
    @property
256
    def should_exit(self):
257
        return self._should_exit.isSet()
258
259
    def wait_for_exit_request(self, timeout=None):
260
        """
261
        Wait until the thread has been signalled to exit.
262
263
        If timeout is specified (as a float of seconds to wait) then wait
264
        up to this many seconds before returning the value of `should_exit`.
265
        """
266
        should_exit = self._should_exit.wait(timeout)
267
        if should_exit is None:
268
            # Python 2.6 compatibility which doesn't return self.__flag when
269
            # calling Event.wait()
270
            should_exit = self.should_exit
271
        return should_exit
272
273
274
def reset_all(redis_client):
275
    """
276
    Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
277
    """
278
    for lock_key in redis_client.keys('lock:*'):
279
        redis_client.delete(lock_key)
280
    for lock_key in redis_client.keys('lock-signal:*'):
281
        redis_client.delete(lock_key)
282