Completed
Pull Request — master (#32)
by
unknown
46s
created

tests.test_release_from_nonblocking_leaving_garbage()   A

Complexity

Conditions 3

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 6
rs 9.4285
1
from __future__ import print_function, division
2
3
import os
4
import platform
5
import sys
6
import time
7
from collections import defaultdict
8
import multiprocessing
9
10
import pytest
11
from process_tests import TestProcess
12
from process_tests import dump_on_error
13
from process_tests import wait_for_strings
14
15
from redis import StrictRedis
16
17
from redis_lock import AlreadyAcquired
18
from redis_lock import InterruptableThread
19
from redis_lock import InvalidTimeout
20
from redis_lock import Lock
21
from redis_lock import NotAcquired
22
from redis_lock import TimeoutTooLarge
23
from redis_lock import TimeoutNotUsable
24
from redis_lock import NotExpirable
25
from redis_lock import reset_all
26
from conf import HELPER
27
from conf import TIMEOUT
28
from conf import UDS_PATH
29
30
31
@pytest.yield_fixture
32
def redis_server(scope='module'):
33
    try:
34
        os.unlink(UDS_PATH)
35
    except OSError:
36
        pass
37
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
38
        with dump_on_error(process.read):
39
            wait_for_strings(process.read, TIMEOUT, "Running")
40
            yield process
41
42
43
@pytest.fixture(scope='function')
44
def make_conn(request, redis_server):
45
    """Redis connection factory."""
46
    def make_conn_factory():
47
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
48
        request.addfinalizer(conn_.flushdb)
49
50
        return conn_
51
    return make_conn_factory
52
53
54
@pytest.fixture(scope='function')
55
def conn(request, make_conn):
56
    return make_conn()
57
58
59
@pytest.fixture
60
def make_process(request):
61
    """Process factory, that makes processes, that terminate themselves
62
    after a test run.
63
    """
64
    def make_process_factory(*args, **kwargs):
65
        process = multiprocessing.Process(*args, **kwargs)
66
        request.addfinalizer(process.terminate)
67
68
        return process
69
70
    return make_process_factory
71
72
73
def test_simple(redis_server):
74
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
75
        with dump_on_error(proc.read):
76
            name = 'lock:foobar'
77
            wait_for_strings(
78
                proc.read, TIMEOUT,
79
                'Getting %r ...' % name,
80
                'Got lock for %r.' % name,
81
                'Releasing %r.' % name,
82
                'UNLOCK_SCRIPT not cached.',
83
                'DIED.',
84
            )
85
86
87
def test_no_block(conn):
88
    with Lock(conn, "foobar"):
89
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
90
            with dump_on_error(proc.read):
91
                name = 'lock:foobar'
92
                wait_for_strings(
93
                    proc.read, TIMEOUT,
94
                    'Getting %r ...' % name,
95
                    'Failed to get %r.' % name,
96
                    'acquire=>False',
97
                    'DIED.',
98
                )
99
100
101
def test_timeout(conn):
102
    with Lock(conn, "foobar"):
103
        lock = Lock(conn, "foobar")
104
        assert lock.acquire(timeout=1) == False
105
106
107
def test_timeout_expire(conn):
108
    lock1 = Lock(conn, "foobar", expire=1)
109
    lock1.acquire()
110
    lock2 = Lock(conn, "foobar")
111
    assert lock2.acquire(timeout=2)
112
113
114
def test_timeout_expire_with_renewal(conn):
115
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
116
        lock = Lock(conn, "foobar")
117
        assert lock.acquire(timeout=2) == False
118
119
120
def test_timeout_acquired(conn):
121
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
122
        with dump_on_error(proc.read):
123
            name = 'lock:foobar'
124
            wait_for_strings(
125
                proc.read, TIMEOUT,
126
                'Getting %r ...' % name,
127
                'Got lock for %r.' % name,
128
            )
129
            lock = Lock(conn, "foobar")
130
            assert lock.acquire(timeout=2)
131
132
133
def test_not_usable_timeout(conn):
134
    lock = Lock(conn, "foobar")
135
    with pytest.raises(TimeoutNotUsable):
136
        lock.acquire(blocking=False, timeout=1)
137
138
139
def test_expire_less_than_timeout(conn):
140
    lock = Lock(conn, "foobar", expire=1)
141
    with pytest.raises(TimeoutTooLarge):
142
        lock.acquire(blocking=True, timeout=2)
143
144
145
def test_invalid_timeout(conn):
146
    lock = Lock(conn, "foobar")
147
    with pytest.raises(InvalidTimeout):
148
        lock.acquire(blocking=True, timeout=0)
149
150
    lock = Lock(conn, "foobar")
151
    with pytest.raises(InvalidTimeout):
152
        lock.acquire(blocking=True, timeout=-1)
153
154
155
def test_expire(conn):
156
    lock = Lock(conn, "foobar", expire=TIMEOUT/4)
157
    lock.acquire()
158
    with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
159
        with dump_on_error(proc.read):
160
            name = 'lock:foobar'
161
            wait_for_strings(
162
                proc.read, TIMEOUT,
163
                'Getting %r ...' % name,
164
                'Got lock for %r.' % name,
165
                'Releasing %r.' % name,
166
                'UNLOCK_SCRIPT not cached.',
167
                'DIED.',
168
            )
169
    lock = Lock(conn, "foobar")
170
    try:
171
        assert lock.acquire(blocking=False) == True
172
    finally:
173
        lock.release()
174
175
176
def test_expire_without_timeout(conn):
177
    first_lock = Lock(conn, 'expire', expire=2)
178
    second_lock = Lock(conn, 'expire', expire=1)
179
    first_lock.acquire()
180
    assert second_lock.acquire(blocking=False) is False
181
    assert second_lock.acquire() is True
182
    second_lock.release()
183
184
185
def test_extend(conn):
186
    name = 'foobar'
187
    key_name = 'lock:' + name
188
    with Lock(conn, name, expire=100) as lock:
189
        assert conn.ttl(key_name) <= 100
190
191
        lock.extend(expire=1000)
192
        assert conn.ttl(key_name) > 100
193
194
195
def test_extend_lock_default_expire(conn):
196
    name = 'foobar'
197
    key_name = 'lock:' + name
198
    with Lock(conn, name, expire=1000) as lock:
199
        time.sleep(3)
200
        assert conn.ttl(key_name) <= 997
201
        lock.extend()
202
        assert 997 < conn.ttl(key_name) <= 1000
203
204
205
def test_extend_lock_without_expire_fail(conn):
206
    name = 'foobar'
207
    with Lock(conn, name) as lock:
208
        with pytest.raises(NotExpirable):
209
            lock.extend(expire=1000)
210
211
        with pytest.raises(TypeError):
212
            lock.extend()
213
214
215
def test_extend_another_instance(conn):
216
    """It is possible to extend a lock using another instance of Lock with the
217
    same name.
218
    """
219
    name = 'foobar'
220
    key_name = 'lock:' + name
221
    lock = Lock(conn, name, expire=100)
222
    lock.acquire()
223
    assert 0 <= conn.ttl(key_name) <= 100
224
225
    another_lock = Lock(conn, name, id=lock.id)
226
    another_lock.extend(1000)
227
228
    assert conn.ttl(key_name) > 100
229
230
231
def test_extend_another_instance_different_id_fail(conn):
232
    """It is impossible to extend a lock using another instance of Lock with
233
    the same name, but different id.
234
    """
235
    name = 'foobar'
236
    key_name = 'lock:' + name
237
    lock = Lock(conn, name, expire=100)
238
    lock.acquire()
239
    assert 0 <= conn.ttl(key_name) <= 100
240
241
    another_lock = Lock(conn, name)
242
    with pytest.raises(NotAcquired):
243
        another_lock.extend(1000)
244
245
    assert conn.ttl(key_name) <= 100
246
    assert lock.id != another_lock.id
247
248
249
def test_double_acquire(conn):
250
    lock = Lock(conn, "foobar")
251
    with lock:
252
        pytest.raises(RuntimeError, lock.acquire)
253
        pytest.raises(AlreadyAcquired, lock.acquire)
254
255
256
def test_plain(conn):
257
    with Lock(conn, "foobar"):
258
        time.sleep(0.01)
259
260
261
def test_no_overlap(redis_server):
262
    """
263
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
264
265
    If there would be a bug that would allow two clients to hold the lock at the same time it
266
    would most likely regress this test.
267
268
    The code here mostly tries to parse out the pid of the process and the time when it got and
269
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
270
    got a very bad regression on our hands ...
271
272
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
273
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
274
    """
275
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
276
        with dump_on_error(proc.read):
277
            name = 'lock:foobar'
278
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
279
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
280
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
281
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
282
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
283
284
            class Event(object):
285
                pid = start = end = '?'
286
287
                def __str__(self):
288
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
289
290
            events = defaultdict(Event)
291
            for line in proc.read().splitlines():
292
                try:
293
                    pid, time, junk = line.split(' ', 2)
294
                    pid = int(pid)
295
                except ValueError:
296
                    continue
297
                if 'Got lock for' in junk:
298
                    events[pid].pid = pid
299
                    events[pid].start = time
300
                if 'Releasing' in junk:
301
                    events[pid].pid = pid
302
                    events[pid].end = time
303
            assert len(events) == 125
304
305
            # not very smart but we don't have millions of events so it's
306
            # ok - compare all the events with all the other events:
307
            for event in events.values():
308
                for other in events.values():
309
                    if other is not event:
310
                        try:
311
                            if other.start < event.start < other.end or \
312
                               other.start < event.end < other.end:
313
                                pytest.fail('%s overlaps %s' % (event, other))
314
                        except:
315
                            print("[%s/%s]" % (event, other))
316
                            raise
317
318
319
NWORKERS = 125
320
321
@pytest.mark.skipif(platform.python_implementation() == 'PyPy', reason="This appears to be way too slow to run on PyPy")
322
def test_no_overlap2(make_process, make_conn):
323
    """The second version of contention test, that uses multiprocessing."""
324
    go         = multiprocessing.Event()
325
    count_lock = multiprocessing.Lock()
326
    count      = multiprocessing.Value('H', 0)
327
328
    def workerfn(go, count_lock, count):
329
        redis_lock = Lock(make_conn(), 'lock')
330
        with count_lock:
331
            count.value += 1
332
333
        go.wait()
334
335
        if redis_lock.acquire(blocking=True):
336
            with count_lock:
337
                count.value += 1
338
339
    for _ in range(NWORKERS):
340
        make_process(target=workerfn, args=(go, count_lock, count)).start()
341
342
    # Wait until all workers will come to point when they are ready to acquire
343
    # the redis lock.
344
    while count.value < NWORKERS:
345
        time.sleep(0.5)
346
347
    # Then "count" will be used as counter of workers, which acquired
348
    # redis-lock with success.
349
    count.value = 0
350
351
    go.set()
352
353
    time.sleep(1)
354
355
    assert count.value == 1
356
357
358
def test_reset(conn):
359
    lock = Lock(conn, "foobar")
360
    lock.reset()
361
    new_lock = Lock(conn, "foobar")
362
    new_lock.acquire(blocking=False)
363
    new_lock.release()
364
    pytest.raises(NotAcquired, lock.release)
365
366
367
def test_reset_all(conn):
368
    lock1 = Lock(conn, "foobar1")
369
    lock2 = Lock(conn, "foobar2")
370
    lock1.acquire(blocking=False)
371
    lock2.acquire(blocking=False)
372
    reset_all(conn)
373
    lock1 = Lock(conn, "foobar1")
374
    lock2 = Lock(conn, "foobar2")
375
    lock1.acquire(blocking=False)
376
    lock2.acquire(blocking=False)
377
    lock1.release()
378
    lock2.release()
379
380
381
def test_owner_id(conn):
382
    unique_identifier = b"foobar-identifier"
383
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
384
    lock_id = lock.id
385
    assert lock_id == unique_identifier
386
387
388
def test_get_owner_id(conn):
389
    lock = Lock(conn, "foobar-tok")
390
    lock.acquire()
391
    assert lock.get_owner_id() == lock.id
392
    lock.release()
393
394
395
def test_token(conn):
396
    lock = Lock(conn, "foobar-tok")
397
    tok = lock.id
398
    assert conn.get(lock._name) is None
399
    lock.acquire(blocking=False)
400
    assert conn.get(lock._name) == tok
401
402
403
def test_bogus_release(conn):
404
    lock = Lock(conn, "foobar-tok")
405
    pytest.raises(NotAcquired, lock.release)
406
    lock.acquire()
407
    lock2 = Lock(conn, "foobar-tok", id=lock.id)
408
    lock2.release()
409
410
411
def test_no_auto_renewal(conn):
412
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
413
    assert lock._lock_renewal_interval is None
414
    lock.acquire()
415
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
416
417
418
def test_auto_renewal_bad_values(conn):
419
    with pytest.raises(ValueError):
420
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
421
422
423
def test_auto_renewal(conn):
424
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
425
    lock.acquire()
426
427
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
428
    assert not lock._lock_renewal_thread.should_exit
429
    assert lock._lock_renewal_interval == 2
430
431
    time.sleep(3)
432
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
433
434
    lock.release()
435
    assert lock._lock_renewal_thread is None
436
437
438
def test_signal_expiration(conn):
439
    """Signal keys expire after one millisecond after releasing the lock."""
440
    lock = Lock(conn, 'signal_expiration')
441
    lock.acquire()
442
    lock.release()
443
    time.sleep(0.002)
444
    assert conn.llen('lock-signal:signal_expiration') == 0
445
446
447
def test_reset_signalizes(make_conn, make_process):
448
    """Call to reset() causes LPUSH to signal key, so blocked waiters
449
    become unblocked."""
450
    def workerfn(unblocked):
451
        conn = make_conn()
452
        lock = Lock(conn, 'lock')
453
        if lock.acquire():
454
            unblocked.value = 1
455
456
    unblocked = multiprocessing.Value('B', 0)
457
    conn = make_conn()
458
    lock = Lock(conn, 'lock')
459
    lock.acquire()
460
461
    worker = make_process(target=workerfn, args=(unblocked,))
462
    worker.start()
463
    worker.join(0.5)
464
    lock.reset()
465
    worker.join(0.5)
466
467
    assert unblocked.value == 1
468
469
470
def test_reset_all_signalizes(make_conn, make_process):
471
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
472
    become unblocked."""
473
    def workerfn(unblocked):
474
        conn = make_conn()
475
        lock1 = Lock(conn, 'lock1')
476
        lock2 = Lock(conn, 'lock2')
477
        if lock1.acquire() and lock2.acquire():
478
            unblocked.value = 1
479
480
    unblocked = multiprocessing.Value('B', 0)
481
    conn = make_conn()
482
    lock1 = Lock(conn, 'lock1')
483
    lock2 = Lock(conn, 'lock2')
484
    lock1.acquire()
485
    lock2.acquire()
486
487
    worker = make_process(target=workerfn, args=(unblocked,))
488
    worker.start()
489
    worker.join(0.5)
490
    reset_all(conn)
491
    worker.join(0.5)
492
493
    assert unblocked.value == 1
494