Issues (2)

tests/test_redis_lock.py (2 issues)

1
from __future__ import division
2
from __future__ import print_function
3
4
import gc
5
import multiprocessing
6
import os
7
import platform
8
import sys
9
import threading
10
import time
11
from collections import defaultdict
12
13
import pytest
14
from process_tests import TestProcess
15
from process_tests import dump_on_error
16
from process_tests import wait_for_strings
17
from redis import StrictRedis
18
19
from redis_lock import AlreadyAcquired
20
from redis_lock import InvalidTimeout
21
from redis_lock import Lock
22
from redis_lock import NotAcquired
23
from redis_lock import NotExpirable
24
from redis_lock import TimeoutNotUsable
25
from redis_lock import TimeoutTooLarge
26
from redis_lock import reset_all
27
28
from conf import HELPER
29
from conf import TIMEOUT
30
from conf import UDS_PATH
31
32
33
@pytest.yield_fixture
34
def redis_server(scope='module'):
35
    try:
36
        os.unlink(UDS_PATH)
37
    except OSError:
38
        pass
39
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
40
        with dump_on_error(process.read):
41
            wait_for_strings(process.read, TIMEOUT, "Running")
42
            yield process
43
44
45
@pytest.fixture(scope='function')
46
def make_conn(request, redis_server):
47
    """Redis connection factory."""
48
    def make_conn_factory():
49
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
50
        request.addfinalizer(conn_.flushdb)
51
52
        return conn_
53
    return make_conn_factory
54
55
56
@pytest.fixture(scope='function')
57
def conn(request, make_conn):
58
    return make_conn()
59
60
61
@pytest.fixture
62
def make_process(request):
63
    """Process factory, that makes processes, that terminate themselves
64
    after a test run.
65
    """
66
    def make_process_factory(*args, **kwargs):
67
        process = multiprocessing.Process(*args, **kwargs)
68
        request.addfinalizer(process.terminate)
69
70
        return process
71
72
    return make_process_factory
73
74
75
def test_simple(redis_server):
76
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
77
        with dump_on_error(proc.read):
78
            name = 'lock:foobar'
79
            wait_for_strings(
80
                proc.read, TIMEOUT,
81
                'Getting %r ...' % name,
82
                'Got lock for %r.' % name,
83
                'Releasing %r.' % name,
84
                'UNLOCK_SCRIPT not cached.',
85
                'DIED.',
86
            )
87
88
89
def test_no_block(conn):
90
    with Lock(conn, "foobar"):
91
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
92
            with dump_on_error(proc.read):
93
                name = 'lock:foobar'
94
                wait_for_strings(
95
                    proc.read, TIMEOUT,
96
                    'Getting %r ...' % name,
97
                    'Failed to get %r.' % name,
98
                    'acquire=>False',
99
                    'DIED.',
100
                )
101
102
103
def test_timeout(conn):
104
    with Lock(conn, "foobar"):
105
        lock = Lock(conn, "foobar")
106
        assert lock.acquire(timeout=1) == False
107
108
109
def test_timeout_expire(conn):
110
    lock1 = Lock(conn, "foobar", expire=1)
111
    lock1.acquire()
112
    lock2 = Lock(conn, "foobar")
113
    assert lock2.acquire(timeout=2)
114
115
116
def test_timeout_expire_with_renewal(conn):
117
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
118
        lock = Lock(conn, "foobar")
119
        assert lock.acquire(timeout=2) == False
120
121
122
def test_timeout_acquired(conn):
123
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
124
        with dump_on_error(proc.read):
125
            name = 'lock:foobar'
126
            wait_for_strings(
127
                proc.read, TIMEOUT,
128
                'Getting %r ...' % name,
129
                'Got lock for %r.' % name,
130
            )
131
            lock = Lock(conn, "foobar")
132
            assert lock.acquire(timeout=2)
133
134
135
def test_not_usable_timeout(conn):
136
    lock = Lock(conn, "foobar")
137
    with pytest.raises(TimeoutNotUsable):
138
        lock.acquire(blocking=False, timeout=1)
139
140
141
def test_expire_less_than_timeout(conn):
142
    lock = Lock(conn, "foobar", expire=1)
143
    with pytest.raises(TimeoutTooLarge):
144
        lock.acquire(blocking=True, timeout=2)
145
146
147
def test_invalid_timeout(conn):
148
    lock = Lock(conn, "foobar")
149
    with pytest.raises(InvalidTimeout):
150
        lock.acquire(blocking=True, timeout=0)
151
152
    lock = Lock(conn, "foobar")
153
    with pytest.raises(InvalidTimeout):
154
        lock.acquire(blocking=True, timeout=-1)
155
156
157
def test_expire(conn):
158
    lock = Lock(conn, "foobar", expire=TIMEOUT/4)
159
    lock.acquire()
160
    with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
161
        with dump_on_error(proc.read):
162
            name = 'lock:foobar'
163
            wait_for_strings(
164
                proc.read, TIMEOUT,
165
                'Getting %r ...' % name,
166
                'Got lock for %r.' % name,
167
                'Releasing %r.' % name,
168
                'UNLOCK_SCRIPT not cached.',
169
                'DIED.',
170
            )
171
    lock = Lock(conn, "foobar")
172
    try:
173
        assert lock.acquire(blocking=False) == True
174
    finally:
175
        lock.release()
176
177
178
def test_expire_without_timeout(conn):
179
    first_lock = Lock(conn, 'expire', expire=2)
180
    second_lock = Lock(conn, 'expire', expire=1)
181
    first_lock.acquire()
182
    assert second_lock.acquire(blocking=False) is False
183
    assert second_lock.acquire() is True
184
    second_lock.release()
185
186
187
def test_extend(conn):
188
    name = 'foobar'
189
    key_name = 'lock:' + name
190
    with Lock(conn, name, expire=100) as lock:
191
        assert conn.ttl(key_name) <= 100
192
193
        lock.extend(expire=1000)
194
        assert conn.ttl(key_name) > 100
195
196
197
def test_extend_lock_default_expire(conn):
198
    name = 'foobar'
199
    key_name = 'lock:' + name
200
    with Lock(conn, name, expire=1000) as lock:
201
        time.sleep(3)
202
        assert conn.ttl(key_name) <= 997
203
        lock.extend()
204
        assert 997 < conn.ttl(key_name) <= 1000
205
206
207
def test_extend_lock_without_expire_fail(conn):
208
    name = 'foobar'
209
    with Lock(conn, name) as lock:
210
        with pytest.raises(NotExpirable):
211
            lock.extend(expire=1000)
212
213
        with pytest.raises(TypeError):
214
            lock.extend()
215
216
217
def test_extend_another_instance(conn):
218
    """It is possible to extend a lock using another instance of Lock with the
219
    same name.
220
    """
221
    name = 'foobar'
222
    key_name = 'lock:' + name
223
    lock = Lock(conn, name, expire=100)
224
    lock.acquire()
225
    assert 0 <= conn.ttl(key_name) <= 100
226
227
    another_lock = Lock(conn, name, id=lock.id)
228
    another_lock.extend(1000)
229
230
    assert conn.ttl(key_name) > 100
231
232
233
def test_extend_another_instance_different_id_fail(conn):
234
    """It is impossible to extend a lock using another instance of Lock with
235
    the same name, but different id.
236
    """
237
    name = 'foobar'
238
    key_name = 'lock:' + name
239
    lock = Lock(conn, name, expire=100)
240
    lock.acquire()
241
    assert 0 <= conn.ttl(key_name) <= 100
242
243
    another_lock = Lock(conn, name)
244
    with pytest.raises(NotAcquired):
245
        another_lock.extend(1000)
246
247
    assert conn.ttl(key_name) <= 100
248
    assert lock.id != another_lock.id
249
250
251
def test_double_acquire(conn):
252
    lock = Lock(conn, "foobar")
253
    with lock:
254
        pytest.raises(RuntimeError, lock.acquire)
255
        pytest.raises(AlreadyAcquired, lock.acquire)
256
257
258
def test_plain(conn):
259
    with Lock(conn, "foobar"):
260
        time.sleep(0.01)
261
262
263
def test_no_overlap(redis_server):
264
    """
265
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
266
267
    If there would be a bug that would allow two clients to hold the lock at the same time it
268
    would most likely regress this test.
269
270
    The code here mostly tries to parse out the pid of the process and the time when it got and
271
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
272
    got a very bad regression on our hands ...
273
274
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
275
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
276
    """
277
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
278
        with dump_on_error(proc.read):
279
            name = 'lock:foobar'
280
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
281
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
282
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
283
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
284
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
285
286
            class Event(object):
287
                pid = start = end = '?'
288
289
                def __str__(self):
290
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
291
292
            events = defaultdict(Event)
293
            for line in proc.read().splitlines():
294
                try:
295
                    pid, time, junk = line.split(' ', 2)
296
                    pid = int(pid)
297
                except ValueError:
298
                    continue
299
                if 'Got lock for' in junk:
300
                    events[pid].pid = pid
301
                    events[pid].start = time
302
                if 'Releasing' in junk:
303
                    events[pid].pid = pid
304
                    events[pid].end = time
305
            assert len(events) == 125
306
307
            # not very smart but we don't have millions of events so it's
308
            # ok - compare all the events with all the other events:
309
            for event in events.values():
310
                for other in events.values():
311
                    if other is not event:
312
                        try:
313
                            if other.start < event.start < other.end or \
314
                               other.start < event.end < other.end:
315
                                pytest.fail('%s overlaps %s' % (event, other))
316
                        except:
317
                            print("[%s/%s]" % (event, other))
318
                            raise
319
320
321
NWORKERS = 125
322
323
@pytest.mark.skipif(platform.python_implementation() == 'PyPy', reason="This appears to be way too slow to run on PyPy")
324
def test_no_overlap2(make_process, make_conn):
325
    """The second version of contention test, that uses multiprocessing."""
326
    go         = multiprocessing.Event()
327
    count_lock = multiprocessing.Lock()
328
    count      = multiprocessing.Value('H', 0)
329
330
    def workerfn(go, count_lock, count):
331
        redis_lock = Lock(make_conn(), 'lock')
332
        with count_lock:
333
            count.value += 1
334
335
        go.wait()
336
337
        if redis_lock.acquire(blocking=True):
338
            with count_lock:
339
                count.value += 1
340
341
    for _ in range(NWORKERS):
342
        make_process(target=workerfn, args=(go, count_lock, count)).start()
343
344
    # Wait until all workers will come to point when they are ready to acquire
345
    # the redis lock.
346
    while count.value < NWORKERS:
347
        time.sleep(0.5)
348
349
    # Then "count" will be used as counter of workers, which acquired
350
    # redis-lock with success.
351
    count.value = 0
352
353
    go.set()
354
355
    time.sleep(1)
356
357
    assert count.value == 1
358
359
360
def test_reset(conn):
361
    lock = Lock(conn, "foobar")
362
    lock.reset()
363
    new_lock = Lock(conn, "foobar")
364
    new_lock.acquire(blocking=False)
365
    new_lock.release()
366
    pytest.raises(NotAcquired, lock.release)
367
368
369
def test_reset_all(conn):
370
    lock1 = Lock(conn, "foobar1")
371
    lock2 = Lock(conn, "foobar2")
372
    lock1.acquire(blocking=False)
373
    lock2.acquire(blocking=False)
374
    reset_all(conn)
375
    lock1 = Lock(conn, "foobar1")
376
    lock2 = Lock(conn, "foobar2")
377
    lock1.acquire(blocking=False)
378
    lock2.acquire(blocking=False)
379
    lock1.release()
380
    lock2.release()
381
382
383
def test_owner_id(conn):
384
    unique_identifier = b"foobar-identifier"
385
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
386
    lock_id = lock.id
387
    assert lock_id == unique_identifier
388
389
390
def test_get_owner_id(conn):
391
    lock = Lock(conn, "foobar-tok")
392
    lock.acquire()
393
    assert lock.get_owner_id() == lock.id
394
    lock.release()
395
396
397
def test_token(conn):
398
    lock = Lock(conn, "foobar-tok")
399
    tok = lock.id
400
    assert conn.get(lock._name) is None
401
    lock.acquire(blocking=False)
402
    assert conn.get(lock._name) == tok
403
404
405
def test_bogus_release(conn):
406
    lock = Lock(conn, "foobar-tok")
407
    pytest.raises(NotAcquired, lock.release)
408
    lock.acquire()
409
    lock2 = Lock(conn, "foobar-tok", id=lock.id)
410
    lock2.release()
411
412
413
def test_no_auto_renewal(conn):
414
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
415
    assert lock._lock_renewal_interval is None
416
    lock.acquire()
417
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
418
419
420
def test_auto_renewal_bad_values(conn):
421
    with pytest.raises(ValueError):
422
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
423
424
425
def test_auto_renewal(conn):
426
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
427
    lock.acquire()
428
429
    assert isinstance(lock._lock_renewal_thread, threading.Thread)
430
    assert not lock._lock_renewal_stop.is_set()
431
    assert isinstance(lock._lock_renewal_interval, float)
432
    assert lock._lock_renewal_interval == 2
433
434
    time.sleep(3)
435
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
436
437
    lock.release()
438
    assert lock._lock_renewal_thread is None
439
440
441
def test_signal_cleanup_on_release(conn):
442
    """After releasing a lock, the signal key should not remain."""
443
    lock = Lock(conn, 'foo')
444
    lock.acquire()
445
    lock.release()
446
    assert conn.llen('lock-signal:foo') == 0
447
    assert conn.exists('lock-signal:foo') == 0
448
449
450
def test_signal_cleanup_on_reset(conn):
451
    """After resetting a lock, the signal key should not remain."""
452
    lock = Lock(conn, 'foo')
453
    lock.acquire()
454
    lock.reset()
455
    assert conn.llen('lock-signal:foo') == 0
456
    assert conn.exists('lock-signal:foo') == 0
457
458
459
def test_signal_cleanup_on_reset_all(conn):
460
    """After resetting all locks, no signal keys should not remain."""
461
    lock = Lock(conn, 'foo')
462
    lock.acquire()
463
    reset_all(conn)
464 View Code Duplication
    assert conn.llen('lock-signal:foo') == 0
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
465
    assert conn.exists('lock-signal:foo') == 0
466
467
468
def test_reset_signalizes(make_conn, make_process):
469
    """Call to reset() causes LPUSH to signal key, so blocked waiters
470
    become unblocked."""
471
    def workerfn(unblocked):
472
        conn = make_conn()
473
        lock = Lock(conn, 'lock')
474
        if lock.acquire():
475
            unblocked.value = 1
476
477
    unblocked = multiprocessing.Value('B', 0)
478
    conn = make_conn()
479
    lock = Lock(conn, 'lock')
480
    lock.acquire()
481
482
    worker = make_process(target=workerfn, args=(unblocked,))
483
    worker.start()
484
    worker.join(0.5)
485
    lock.reset()
486
    worker.join(0.5)
487 View Code Duplication
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
488
    assert unblocked.value == 1
489
490
491
def test_reset_all_signalizes(make_conn, make_process):
492
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
493
    become unblocked."""
494
    def workerfn(unblocked):
495
        conn = make_conn()
496
        lock1 = Lock(conn, 'lock1')
497
        lock2 = Lock(conn, 'lock2')
498
        if lock1.acquire() and lock2.acquire():
499
            unblocked.value = 1
500
501
    unblocked = multiprocessing.Value('B', 0)
502
    conn = make_conn()
503
    lock1 = Lock(conn, 'lock1')
504
    lock2 = Lock(conn, 'lock2')
505
    lock1.acquire()
506
    lock2.acquire()
507
508
    worker = make_process(target=workerfn, args=(unblocked,))
509
    worker.start()
510
    worker.join(0.5)
511
    reset_all(conn)
512
    worker.join(0.5)
513
514
    assert unblocked.value == 1
515
516
517
def test_auto_renewal_stops_after_gc(conn):
518
    """Auto renewal stops after lock is garbage collected."""
519
    lock = Lock(conn, 'spam', auto_renewal=True, expire=1)
520
    name = lock._name
521
    lock.acquire(blocking=True)
522
    lock_renewal_thread = lock._lock_renewal_thread
523
    del lock
524
    gc.collect()
525
526
    slept = 0
527
    interval = 0.1
528
    while slept <= 5:
529
        slept += interval
530
        lock_renewal_thread.join(interval)
531
        if not lock_renewal_thread.is_alive():
532
            break
533
534
    time.sleep(1.5)
535
536
    assert not lock_renewal_thread.is_alive()
537
    assert conn.get(name) is None
538
539
540
def test_given_id(conn):
541
    """It is possible to extend a lock using another instance of Lock with the
542
    same name.
543
    """
544
    name = 'foobar'
545
    key_name = 'lock:' + name
546
    orig = Lock(conn, name, expire=100, id=b"a")
547
    orig.acquire()
548
    pytest.raises(TypeError, Lock, conn, name, id=object())
549
    lock = Lock(conn, name, id=b"a")
550
    pytest.raises(AlreadyAcquired, lock.acquire)
551
    lock.extend(100)
552
    lock.release()  # this works, note that this ain't the object that acquired the lock
553
    pytest.raises(NotAcquired, orig.release)  # and this fails because lock was released above
554
555
    assert conn.ttl(key_name) == -2
556
557
558
def test_strict_check():
559
    pytest.raises(ValueError, Lock, object(), name='foobar')
560
    Lock(object(), name='foobar', strict=False)
561