Completed
Push — master ( 874bf3...20838a )
by Ionel Cristian
54s
created

tests.test_extend_lock_default_expire()   A

Complexity

Conditions 4

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 8
rs 9.2
1
from __future__ import print_function
2
3
import os
4
import sys
5
import time
6
from collections import defaultdict
7
8
import pytest
9
from process_tests import TestProcess
10
from process_tests import dump_on_error
11
from process_tests import wait_for_strings
12
13
from redis import StrictRedis
14
15
from redis_lock import AlreadyAcquired
16
from redis_lock import InterruptableThread
17
from redis_lock import InvalidTimeout
18
from redis_lock import Lock
19
from redis_lock import NotAcquired
20
from redis_lock import TimeoutTooLarge
21
from redis_lock import TimeoutNotUsable
22
from redis_lock import NotExpirable
23
from redis_lock import reset_all
24
from conf import HELPER
25
from conf import TIMEOUT
26
from conf import UDS_PATH
27
28
29
@pytest.yield_fixture
30
def redis_server(scope='module'):
31
    try:
32
        os.unlink(UDS_PATH)
33
    except OSError:
34
        pass
35
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
36
        with dump_on_error(process.read):
37
            wait_for_strings(process.read, TIMEOUT, "Running")
38
            yield process
39
40
41
@pytest.fixture(scope='function')
42
def conn(request, redis_server):
43
    conn_ = StrictRedis(unix_socket_path=UDS_PATH)
44
    request.addfinalizer(conn_.flushdb)
45
    return conn_
46
47
48
def test_simple(redis_server):
49
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
50
        with dump_on_error(proc.read):
51
            name = 'lock:foobar'
52
            wait_for_strings(
53
                proc.read, TIMEOUT,
54
                'Getting %r ...' % name,
55
                'Got lock for %r.' % name,
56
                'Releasing %r.' % name,
57
                'UNLOCK_SCRIPT not cached.',
58
                'DIED.',
59
            )
60
61
62
def test_no_block(conn):
63
    with Lock(conn, "foobar"):
64
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
65
            with dump_on_error(proc.read):
66
                name = 'lock:foobar'
67
                wait_for_strings(
68
                    proc.read, TIMEOUT,
69
                    'Getting %r ...' % name,
70
                    'Failed to get %r.' % name,
71
                    'acquire=>False',
72
                    'DIED.',
73
                )
74
75
76
def test_timeout(conn):
77
    with Lock(conn, "foobar"):
78
        lock = Lock(conn, "foobar")
79
        assert lock.acquire(timeout=1) == False
80
81
82
def test_timeout_expire(conn):
83
    with Lock(conn, "foobar", expire=1):
84
        lock = Lock(conn, "foobar")
85
        assert lock.acquire(timeout=2)
86
87
88
def test_timeout_expire_with_renewal(conn):
89
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
90
        lock = Lock(conn, "foobar")
91
        assert lock.acquire(timeout=2) == False
92
93
94
def test_timeout_acquired(conn):
95
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
96
        with dump_on_error(proc.read):
97
            name = 'lock:foobar'
98
            wait_for_strings(
99
                proc.read, TIMEOUT,
100
                'Getting %r ...' % name,
101
                'Got lock for %r.' % name,
102
            )
103
            lock = Lock(conn, "foobar")
104
            assert lock.acquire(timeout=2)
105
106
107
def test_not_usable_timeout(conn):
108
    lock = Lock(conn, "foobar")
109
    with pytest.raises(TimeoutNotUsable):
110
        lock.acquire(blocking=False, timeout=1)
111
112
113
def test_expire_less_than_timeout(conn):
114
    lock = Lock(conn, "foobar", expire=1)
115
    with pytest.raises(TimeoutTooLarge):
116
        lock.acquire(blocking=True, timeout=2)
117
118
119
def test_invalid_timeout(conn):
120
    lock = Lock(conn, "foobar")
121
    with pytest.raises(InvalidTimeout):
122
        lock.acquire(blocking=True, timeout=0)
123
124
    lock = Lock(conn, "foobar")
125
    with pytest.raises(InvalidTimeout):
126
        lock.acquire(blocking=True, timeout=-1)
127
128
129
def test_expire(conn):
130
    with Lock(conn, "foobar", expire=TIMEOUT/4):
131
        with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
132
            with dump_on_error(proc.read):
133
                name = 'lock:foobar'
134
                wait_for_strings(
135
                    proc.read, TIMEOUT,
136
                    'Getting %r ...' % name,
137
                    'Got lock for %r.' % name,
138
                    'Releasing %r.' % name,
139
                    'UNLOCK_SCRIPT not cached.',
140
                    'DIED.',
141
                )
142
    lock = Lock(conn, "foobar")
143
    try:
144
        assert lock.acquire(blocking=False) == True
145
    finally:
146
        lock.release()
147
148
149
def test_extend(conn):
150
    name = 'foobar'
151
    key_name = 'lock:' + name
152
    with Lock(conn, name, expire=100) as lock:
153
        assert conn.ttl(key_name) <= 100
154
155
        lock.extend(expire=1000)
156
        assert conn.ttl(key_name) > 100
157
158
159
def test_extend_lock_default_expire(conn):
160
    name = 'foobar'
161
    key_name = 'lock:' + name
162
    with Lock(conn, name, expire=1000) as lock:
163
        time.sleep(3)
164
        assert conn.ttl(key_name) <= 997
165
        lock.extend()
166
        assert 997 < conn.ttl(key_name) <= 1000
167
168
169
def test_extend_lock_without_expire_fail(conn):
170
    name = 'foobar'
171
    with Lock(conn, name) as lock:
172
        with pytest.raises(NotExpirable):
173
            lock.extend(expire=1000)
174
175
        with pytest.raises(NotExpirable):
176
            lock.extend()
177
178
179
def test_double_acquire(conn):
180
    lock = Lock(conn, "foobar")
181
    with lock:
182
        pytest.raises(RuntimeError, lock.acquire)
183
        pytest.raises(AlreadyAcquired, lock.acquire)
184
185
186
def test_plain(conn):
187
    with Lock(conn, "foobar"):
188
        time.sleep(0.01)
189
190
191
def test_no_overlap(redis_server):
192
    """
193
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
194
    
195
    If there would be a bug that would allow two clients to hold the lock at the same time it 
196
    would most likely regress this test.
197
    
198
    The code here mostly tries to parse out the pid of the process and the time when it got and 
199
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
200
    got a very bad regression on our hands ...
201
    
202
    The subprocess being run (check helper.py) will fork bunch of processes and will try to 
203
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
204
    """
205
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
206
        with dump_on_error(proc.read):
207
            name = 'lock:foobar'
208
            wait_for_strings(proc.read, TIMEOUT, 'Getting %r ...' % name)
209
            wait_for_strings(proc.read, TIMEOUT, 'Got lock for %r.' % name)
210
            wait_for_strings(proc.read, TIMEOUT, 'Releasing %r.' % name)
211
            wait_for_strings(proc.read, TIMEOUT, 'UNLOCK_SCRIPT not cached.')
212
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
213
214
            class Event(object):
215
                pid = start = end = '?'
216
217
                def __str__(self):
218
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
219
220
            events = defaultdict(Event)
221
            for line in proc.read().splitlines():
222
                try:
223
                    pid, time, junk = line.split(' ', 2)
224
                    pid = int(pid)
225
                except ValueError:
226
                    continue
227
                if 'Got lock for' in junk:
228
                    events[pid].pid = pid
229
                    events[pid].start = time
230
                if 'Releasing' in junk:
231
                    events[pid].pid = pid
232
                    events[pid].end = time
233
            assert len(events) == 125
234
235
            # not very smart but we don't have millions of events so it's 
236
            # ok - compare all the events with all the other events:
237
            for event in events.values():
238
                for other in events.values():
239
                    if other is not event:
240
                        try:
241
                            if other.start < event.start < other.end or \
242
                               other.start < event.end < other.end:
243
                                pytest.fail('%s overlaps %s' % (event, other))
244
                        except:
245
                            print("[%s/%s]" % (event, other))
246
                            raise
247
248
249
def test_reset(conn):
250
    with Lock(conn, "foobar") as lock:
251
        lock.reset()
252
        new_lock = Lock(conn, "foobar")
253
        new_lock.acquire(blocking=False)
254
        new_lock.release()
255
256
257
def test_reset_all(conn):
258
    lock1 = Lock(conn, "foobar1")
259
    lock2 = Lock(conn, "foobar2")
260
    lock1.acquire(blocking=False)
261
    lock2.acquire(blocking=False)
262
    reset_all(conn)
263
    lock1 = Lock(conn, "foobar1")
264
    lock2 = Lock(conn, "foobar2")
265
    lock1.acquire(blocking=False)
266
    lock2.acquire(blocking=False)
267
    lock1.release()
268
    lock2.release()
269
270
271
def test_owner_id(conn):
272
    unique_identifier = b"foobar-identifier"
273
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
274
    lock_id = lock.id
275
    assert lock_id == unique_identifier
276
    lock.acquire(blocking=False)
277
    assert lock.get_owner_id() == unique_identifier
278
    lock.release()
279
280
281
def test_token(conn):
282
    lock = Lock(conn, "foobar-tok")
283
    tok = lock.id
284
    assert conn.get(lock._name) is None
285
    lock.acquire(blocking=False)
286
    assert conn.get(lock._name) == tok
287
288
289
def test_bogus_release(conn):
290
    lock = Lock(conn, "foobar-tok")
291
    pytest.raises(NotAcquired, lock.release)
292
    lock.release(force=True)
293
294
295
def test_release_from_nonblocking_leaving_garbage(conn):
296
    for _ in range(10):
297
        lock = Lock(conn, 'release_from_nonblocking')
298
        lock.acquire(blocking=False)
299
        lock.release()
300
        assert conn.llen('lock-signal:release_from_nonblocking') == 1
301
302
303
def test_no_auto_renewal(conn):
304
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
305
    assert lock._lock_renewal_interval is None
306
    lock.acquire()
307
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
308
309
310
def test_auto_renewal_bad_values(conn):
311
    with pytest.raises(ValueError):
312
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
313
314
315
def test_auto_renewal(conn):
316
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
317
    lock.acquire()
318
319
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
320
    assert not lock._lock_renewal_thread.should_exit
321
    assert lock._lock_renewal_interval == 2
322
323
    time.sleep(3)
324
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
325
326
    lock.release()
327
    assert lock._lock_renewal_thread is None
328