Completed
Push — master ( 95d8d5...80ebcd )
by Ionel Cristian
6s
created

test_signal_cleanup_on_reset()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 6
rs 9.4285
1
from __future__ import print_function, division
2
3
import os
4
import gc
5
import platform
6
import sys
7
import time
8
import threading
9
from collections import defaultdict
10
import multiprocessing
11
12
import pytest
13
from process_tests import TestProcess
14
from process_tests import dump_on_error
15
from process_tests import wait_for_strings
16
17
from redis import StrictRedis
18
19
from redis_lock import AlreadyAcquired
20
from redis_lock import InvalidTimeout
21
from redis_lock import Lock
22
from redis_lock import NotAcquired
23
from redis_lock import TimeoutTooLarge
24
from redis_lock import TimeoutNotUsable
25
from redis_lock import NotExpirable
26
from redis_lock import reset_all
27
from conf import HELPER
28
from conf import TIMEOUT
29
from conf import UDS_PATH
30
31
32
@pytest.yield_fixture
33
def redis_server(scope='module'):
34
    try:
35
        os.unlink(UDS_PATH)
36
    except OSError:
37
        pass
38
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
39
        with dump_on_error(process.read):
40
            wait_for_strings(process.read, TIMEOUT, "Running")
41
            yield process
42
43
44
@pytest.fixture(scope='function')
45
def make_conn(request, redis_server):
46
    """Redis connection factory."""
47
    def make_conn_factory():
48
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
49
        request.addfinalizer(conn_.flushdb)
50
51
        return conn_
52
    return make_conn_factory
53
54
55
@pytest.fixture(scope='function')
56
def conn(request, make_conn):
57
    return make_conn()
58
59
60
@pytest.fixture
61
def make_process(request):
62
    """Process factory, that makes processes, that terminate themselves
63
    after a test run.
64
    """
65
    def make_process_factory(*args, **kwargs):
66
        process = multiprocessing.Process(*args, **kwargs)
67
        request.addfinalizer(process.terminate)
68
69
        return process
70
71
    return make_process_factory
72
73
74
def test_simple(redis_server):
75
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
76
        with dump_on_error(proc.read):
77
            name = 'lock:foobar'
78
            wait_for_strings(
79
                proc.read, TIMEOUT,
80
                'Getting %r ...' % name,
81
                'Got lock for %r.' % name,
82
                'Releasing %r.' % name,
83
                'UNLOCK_SCRIPT not cached.',
84
                'DIED.',
85
            )
86
87
88
def test_no_block(conn):
89
    with Lock(conn, "foobar"):
90
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
91
            with dump_on_error(proc.read):
92
                name = 'lock:foobar'
93
                wait_for_strings(
94
                    proc.read, TIMEOUT,
95
                    'Getting %r ...' % name,
96
                    'Failed to get %r.' % name,
97
                    'acquire=>False',
98
                    'DIED.',
99
                )
100
101
102
def test_timeout(conn):
103
    with Lock(conn, "foobar"):
104
        lock = Lock(conn, "foobar")
105
        assert lock.acquire(timeout=1) == False
106
107
108
def test_timeout_expire(conn):
109
    lock1 = Lock(conn, "foobar", expire=1)
110
    lock1.acquire()
111
    lock2 = Lock(conn, "foobar")
112
    assert lock2.acquire(timeout=2)
113
114
115
def test_timeout_expire_with_renewal(conn):
116
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
117
        lock = Lock(conn, "foobar")
118
        assert lock.acquire(timeout=2) == False
119
120
121
def test_timeout_acquired(conn):
122
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
123
        with dump_on_error(proc.read):
124
            name = 'lock:foobar'
125
            wait_for_strings(
126
                proc.read, TIMEOUT,
127
                'Getting %r ...' % name,
128
                'Got lock for %r.' % name,
129
            )
130
            lock = Lock(conn, "foobar")
131
            assert lock.acquire(timeout=2)
132
133
134
def test_not_usable_timeout(conn):
135
    lock = Lock(conn, "foobar")
136
    with pytest.raises(TimeoutNotUsable):
137
        lock.acquire(blocking=False, timeout=1)
138
139
140
def test_expire_less_than_timeout(conn):
141
    lock = Lock(conn, "foobar", expire=1)
142
    with pytest.raises(TimeoutTooLarge):
143
        lock.acquire(blocking=True, timeout=2)
144
145
146
def test_invalid_timeout(conn):
147
    lock = Lock(conn, "foobar")
148
    with pytest.raises(InvalidTimeout):
149
        lock.acquire(blocking=True, timeout=0)
150
151
    lock = Lock(conn, "foobar")
152
    with pytest.raises(InvalidTimeout):
153
        lock.acquire(blocking=True, timeout=-1)
154
155
156
def test_expire(conn):
157
    lock = Lock(conn, "foobar", expire=TIMEOUT/4)
158
    lock.acquire()
159
    with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
160
        with dump_on_error(proc.read):
161
            name = 'lock:foobar'
162
            wait_for_strings(
163
                proc.read, TIMEOUT,
164
                'Getting %r ...' % name,
165
                'Got lock for %r.' % name,
166
                'Releasing %r.' % name,
167
                'UNLOCK_SCRIPT not cached.',
168
                'DIED.',
169
            )
170
    lock = Lock(conn, "foobar")
171
    try:
172
        assert lock.acquire(blocking=False) == True
173
    finally:
174
        lock.release()
175
176
177
def test_expire_without_timeout(conn):
178
    first_lock = Lock(conn, 'expire', expire=2)
179
    second_lock = Lock(conn, 'expire', expire=1)
180
    first_lock.acquire()
181
    assert second_lock.acquire(blocking=False) is False
182
    assert second_lock.acquire() is True
183
    second_lock.release()
184
185
186
def test_extend(conn):
187
    name = 'foobar'
188
    key_name = 'lock:' + name
189
    with Lock(conn, name, expire=100) as lock:
190
        assert conn.ttl(key_name) <= 100
191
192
        lock.extend(expire=1000)
193
        assert conn.ttl(key_name) > 100
194
195
196
def test_extend_lock_default_expire(conn):
197
    name = 'foobar'
198
    key_name = 'lock:' + name
199
    with Lock(conn, name, expire=1000) as lock:
200
        time.sleep(3)
201
        assert conn.ttl(key_name) <= 997
202
        lock.extend()
203
        assert 997 < conn.ttl(key_name) <= 1000
204
205
206
def test_extend_lock_without_expire_fail(conn):
207
    name = 'foobar'
208
    with Lock(conn, name) as lock:
209
        with pytest.raises(NotExpirable):
210
            lock.extend(expire=1000)
211
212
        with pytest.raises(TypeError):
213
            lock.extend()
214
215
216
def test_extend_another_instance(conn):
217
    """It is possible to extend a lock using another instance of Lock with the
218
    same name.
219
    """
220
    name = 'foobar'
221
    key_name = 'lock:' + name
222
    lock = Lock(conn, name, expire=100)
223
    lock.acquire()
224
    assert 0 <= conn.ttl(key_name) <= 100
225
226
    another_lock = Lock(conn, name, id=lock.id)
227
    another_lock.extend(1000)
228
229
    assert conn.ttl(key_name) > 100
230
231
232
def test_extend_another_instance_different_id_fail(conn):
233
    """It is impossible to extend a lock using another instance of Lock with
234
    the same name, but different id.
235
    """
236
    name = 'foobar'
237
    key_name = 'lock:' + name
238
    lock = Lock(conn, name, expire=100)
239
    lock.acquire()
240
    assert 0 <= conn.ttl(key_name) <= 100
241
242
    another_lock = Lock(conn, name)
243
    with pytest.raises(NotAcquired):
244
        another_lock.extend(1000)
245
246
    assert conn.ttl(key_name) <= 100
247
    assert lock.id != another_lock.id
248
249
250
def test_double_acquire(conn):
251
    lock = Lock(conn, "foobar")
252
    with lock:
253
        pytest.raises(RuntimeError, lock.acquire)
254
        pytest.raises(AlreadyAcquired, lock.acquire)
255
256
257
def test_plain(conn):
258
    with Lock(conn, "foobar"):
259
        time.sleep(0.01)
260
261
262
def test_no_overlap(redis_server):
263
    """
264
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
265
266
    If there would be a bug that would allow two clients to hold the lock at the same time it
267
    would most likely regress this test.
268
269
    The code here mostly tries to parse out the pid of the process and the time when it got and
270
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
271
    got a very bad regression on our hands ...
272
273
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
274
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
275
    """
276
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
277
        with dump_on_error(proc.read):
278
            name = 'lock:foobar'
279
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
280
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
281
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
282
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
283
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
284
285
            class Event(object):
286
                pid = start = end = '?'
287
288
                def __str__(self):
289
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
290
291
            events = defaultdict(Event)
292
            for line in proc.read().splitlines():
293
                try:
294
                    pid, time, junk = line.split(' ', 2)
295
                    pid = int(pid)
296
                except ValueError:
297
                    continue
298
                if 'Got lock for' in junk:
299
                    events[pid].pid = pid
300
                    events[pid].start = time
301
                if 'Releasing' in junk:
302
                    events[pid].pid = pid
303
                    events[pid].end = time
304
            assert len(events) == 125
305
306
            # not very smart but we don't have millions of events so it's
307
            # ok - compare all the events with all the other events:
308
            for event in events.values():
309
                for other in events.values():
310
                    if other is not event:
311
                        try:
312
                            if other.start < event.start < other.end or \
313
                               other.start < event.end < other.end:
314
                                pytest.fail('%s overlaps %s' % (event, other))
315
                        except:
316
                            print("[%s/%s]" % (event, other))
317
                            raise
318
319
320
NWORKERS = 125
321
322
@pytest.mark.skipif(platform.python_implementation() == 'PyPy', reason="This appears to be way too slow to run on PyPy")
323
def test_no_overlap2(make_process, make_conn):
324
    """The second version of contention test, that uses multiprocessing."""
325
    go         = multiprocessing.Event()
326
    count_lock = multiprocessing.Lock()
327
    count      = multiprocessing.Value('H', 0)
328
329
    def workerfn(go, count_lock, count):
330
        redis_lock = Lock(make_conn(), 'lock')
331
        with count_lock:
332
            count.value += 1
333
334
        go.wait()
335
336
        if redis_lock.acquire(blocking=True):
337
            with count_lock:
338
                count.value += 1
339
340
    for _ in range(NWORKERS):
341
        make_process(target=workerfn, args=(go, count_lock, count)).start()
342
343
    # Wait until all workers will come to point when they are ready to acquire
344
    # the redis lock.
345
    while count.value < NWORKERS:
346
        time.sleep(0.5)
347
348
    # Then "count" will be used as counter of workers, which acquired
349
    # redis-lock with success.
350
    count.value = 0
351
352
    go.set()
353
354
    time.sleep(1)
355
356
    assert count.value == 1
357
358
359
def test_reset(conn):
360
    lock = Lock(conn, "foobar")
361
    lock.reset()
362
    new_lock = Lock(conn, "foobar")
363
    new_lock.acquire(blocking=False)
364
    new_lock.release()
365
    pytest.raises(NotAcquired, lock.release)
366
367
368
def test_reset_all(conn):
369
    lock1 = Lock(conn, "foobar1")
370
    lock2 = Lock(conn, "foobar2")
371
    lock1.acquire(blocking=False)
372
    lock2.acquire(blocking=False)
373
    reset_all(conn)
374
    lock1 = Lock(conn, "foobar1")
375
    lock2 = Lock(conn, "foobar2")
376
    lock1.acquire(blocking=False)
377
    lock2.acquire(blocking=False)
378
    lock1.release()
379
    lock2.release()
380
381
382
def test_owner_id(conn):
383
    unique_identifier = b"foobar-identifier"
384
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
385
    lock_id = lock.id
386
    assert lock_id == unique_identifier
387
388
389
def test_get_owner_id(conn):
390
    lock = Lock(conn, "foobar-tok")
391
    lock.acquire()
392
    assert lock.get_owner_id() == lock.id
393
    lock.release()
394
395
396
def test_token(conn):
397
    lock = Lock(conn, "foobar-tok")
398
    tok = lock.id
399
    assert conn.get(lock._name) is None
400
    lock.acquire(blocking=False)
401
    assert conn.get(lock._name) == tok
402
403
404
def test_bogus_release(conn):
405
    lock = Lock(conn, "foobar-tok")
406
    pytest.raises(NotAcquired, lock.release)
407
    lock.acquire()
408
    lock2 = Lock(conn, "foobar-tok", id=lock.id)
409
    lock2.release()
410
411
412
def test_no_auto_renewal(conn):
413
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
414
    assert lock._lock_renewal_interval is None
415
    lock.acquire()
416
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
417
418
419
def test_auto_renewal_bad_values(conn):
420
    with pytest.raises(ValueError):
421
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
422
423
424
def test_auto_renewal(conn):
425
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
426
    lock.acquire()
427
428
    assert isinstance(lock._lock_renewal_thread, threading.Thread)
429
    assert not lock._lock_renewal_stop.is_set()
430
    assert isinstance(lock._lock_renewal_interval, float)
431
    assert lock._lock_renewal_interval == 2
432
433
    time.sleep(3)
434
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
435
436
    lock.release()
437
    assert lock._lock_renewal_thread is None
438
439
440
def test_signal_cleanup_on_release(conn):
441
    """After releasing a lock, the signal key should not remain."""
442
    lock = Lock(conn, 'foo')
443
    lock.acquire()
444
    lock.release()
445
    assert conn.llen('lock-signal:foo') == 0
446
447
448
def test_signal_cleanup_on_reset(conn):
449
    """After resetting a lock, the signal key should not remain."""
450
    lock = Lock(conn, 'foo')
451
    lock.acquire()
452
    lock.reset()
453
    assert conn.llen('lock-signal:foo') == 0
454
455
456
def test_signal_cleanup_on_reset_all(conn):
457
    """After resetting all locks, no signal keys should not remain."""
458
    lock = Lock(conn, 'foo')
459
    lock.acquire()
460
    reset_all(conn)
461
    assert conn.llen('lock-signal:foo') == 0
462
463
464 View Code Duplication
def test_reset_signalizes(make_conn, make_process):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
465
    """Call to reset() causes LPUSH to signal key, so blocked waiters
466
    become unblocked."""
467
    def workerfn(unblocked):
468
        conn = make_conn()
469
        lock = Lock(conn, 'lock')
470
        if lock.acquire():
471
            unblocked.value = 1
472
473
    unblocked = multiprocessing.Value('B', 0)
474
    conn = make_conn()
475
    lock = Lock(conn, 'lock')
476
    lock.acquire()
477
478
    worker = make_process(target=workerfn, args=(unblocked,))
479
    worker.start()
480
    worker.join(0.5)
481
    lock.reset()
482
    worker.join(0.5)
483
484
    assert unblocked.value == 1
485
486
487 View Code Duplication
def test_reset_all_signalizes(make_conn, make_process):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
488
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
489
    become unblocked."""
490
    def workerfn(unblocked):
491
        conn = make_conn()
492
        lock1 = Lock(conn, 'lock1')
493
        lock2 = Lock(conn, 'lock2')
494
        if lock1.acquire() and lock2.acquire():
495
            unblocked.value = 1
496
497
    unblocked = multiprocessing.Value('B', 0)
498
    conn = make_conn()
499
    lock1 = Lock(conn, 'lock1')
500
    lock2 = Lock(conn, 'lock2')
501
    lock1.acquire()
502
    lock2.acquire()
503
504
    worker = make_process(target=workerfn, args=(unblocked,))
505
    worker.start()
506
    worker.join(0.5)
507
    reset_all(conn)
508
    worker.join(0.5)
509
510
    assert unblocked.value == 1
511
512
513
def test_auto_renewal_stops_after_gc(conn):
514
    """Auto renewal stops after lock is garbage collected."""
515
    lock = Lock(conn, 'spam', auto_renewal=True, expire=1)
516
    name = lock._name
517
    lock.acquire(blocking=True)
518
    lock_renewal_thread = lock._lock_renewal_thread
519
    del lock
520
    gc.collect()
521
522
    slept = 0
523
    interval = 0.1
524
    while slept <= 5:
525
        slept += interval
526
        lock_renewal_thread.join(interval)
527
        if not lock_renewal_thread.is_alive():
528
            break
529
530
    time.sleep(1.5)
531
532
    assert not lock_renewal_thread.is_alive()
533
    assert conn.get(name) is None
534