Completed
Push — master ( 7f27d5...f5db5d )
by Ionel Cristian
58s
created

tests.test_expire_without_timeout()   A

Complexity

Conditions 3

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 7
rs 9.4286
1
from __future__ import print_function
2
3
import os
4
import sys
5
import time
6
from collections import defaultdict
7
import multiprocessing
8
9
import pytest
10
from process_tests import TestProcess
11
from process_tests import dump_on_error
12
from process_tests import wait_for_strings
13
14
from redis import StrictRedis
15
16
from redis_lock import AlreadyAcquired
17
from redis_lock import InterruptableThread
18
from redis_lock import InvalidTimeout
19
from redis_lock import Lock
20
from redis_lock import NotAcquired
21
from redis_lock import TimeoutTooLarge
22
from redis_lock import TimeoutNotUsable
23
from redis_lock import NotExpirable
24
from redis_lock import reset_all
25
from conf import HELPER
26
from conf import TIMEOUT
27
from conf import UDS_PATH
28
29
30
@pytest.yield_fixture
31
def redis_server(scope='module'):
32
    try:
33
        os.unlink(UDS_PATH)
34
    except OSError:
35
        pass
36
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
37
        with dump_on_error(process.read):
38
            wait_for_strings(process.read, TIMEOUT, "Running")
39
            yield process
40
41
42
@pytest.fixture(scope='function')
43
def make_conn(request, redis_server):
44
    """Redis connection factory."""
45
    def make_conn_factory():
46
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
47
        request.addfinalizer(conn_.flushdb)
48
49
        return conn_
50
    return make_conn_factory
51
52
53
@pytest.fixture(scope='function')
54
def conn(request, make_conn):
55
    return make_conn()
56
57
58
@pytest.fixture
59
def make_process(request):
60
    """Process factory, that makes processes, that terminate themselves
61
    after a test run.
62
    """
63
    def make_process_factory(*args, **kwargs):
64
        process = multiprocessing.Process(*args, **kwargs)
65
        request.addfinalizer(process.terminate)
66
67
        return process
68
69
    return make_process_factory
70
71
72
def test_simple(redis_server):
73
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
74
        with dump_on_error(proc.read):
75
            name = 'lock:foobar'
76
            wait_for_strings(
77
                proc.read, TIMEOUT,
78
                'Getting %r ...' % name,
79
                'Got lock for %r.' % name,
80
                'Releasing %r.' % name,
81
                'UNLOCK_SCRIPT not cached.',
82
                'DIED.',
83
            )
84
85
86
def test_no_block(conn):
87
    with Lock(conn, "foobar"):
88
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
89
            with dump_on_error(proc.read):
90
                name = 'lock:foobar'
91
                wait_for_strings(
92
                    proc.read, TIMEOUT,
93
                    'Getting %r ...' % name,
94
                    'Failed to get %r.' % name,
95
                    'acquire=>False',
96
                    'DIED.',
97
                )
98
99
100
def test_timeout(conn):
101
    with Lock(conn, "foobar"):
102
        lock = Lock(conn, "foobar")
103
        assert lock.acquire(timeout=1) == False
104
105
106
def test_timeout_expire(conn):
107
    with Lock(conn, "foobar", expire=1):
108
        lock = Lock(conn, "foobar")
109
        assert lock.acquire(timeout=2)
110
111
112
def test_timeout_expire_with_renewal(conn):
113
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
114
        lock = Lock(conn, "foobar")
115
        assert lock.acquire(timeout=2) == False
116
117
118
def test_timeout_acquired(conn):
119
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
120
        with dump_on_error(proc.read):
121
            name = 'lock:foobar'
122
            wait_for_strings(
123
                proc.read, TIMEOUT,
124
                'Getting %r ...' % name,
125
                'Got lock for %r.' % name,
126
            )
127
            lock = Lock(conn, "foobar")
128
            assert lock.acquire(timeout=2)
129
130
131
def test_not_usable_timeout(conn):
132
    lock = Lock(conn, "foobar")
133
    with pytest.raises(TimeoutNotUsable):
134
        lock.acquire(blocking=False, timeout=1)
135
136
137
def test_expire_less_than_timeout(conn):
138
    lock = Lock(conn, "foobar", expire=1)
139
    with pytest.raises(TimeoutTooLarge):
140
        lock.acquire(blocking=True, timeout=2)
141
142
143
def test_invalid_timeout(conn):
144
    lock = Lock(conn, "foobar")
145
    with pytest.raises(InvalidTimeout):
146
        lock.acquire(blocking=True, timeout=0)
147
148
    lock = Lock(conn, "foobar")
149
    with pytest.raises(InvalidTimeout):
150
        lock.acquire(blocking=True, timeout=-1)
151
152
153
def test_expire(conn):
154
    with Lock(conn, "foobar", expire=TIMEOUT/4):
155
        with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
156
            with dump_on_error(proc.read):
157
                name = 'lock:foobar'
158
                wait_for_strings(
159
                    proc.read, TIMEOUT,
160
                    'Getting %r ...' % name,
161
                    'Got lock for %r.' % name,
162
                    'Releasing %r.' % name,
163
                    'UNLOCK_SCRIPT not cached.',
164
                    'DIED.',
165
                )
166
    lock = Lock(conn, "foobar")
167
    try:
168
        assert lock.acquire(blocking=False) == True
169
    finally:
170
        lock.release()
171
172
173
def test_expire_without_timeout(conn):
174
    first_lock = Lock(conn, 'expire', expire=2)
175
    second_lock = Lock(conn, 'expire', expire=1)
176
    first_lock.acquire()
177
    assert second_lock.acquire(blocking=False) is False
178
    assert second_lock.acquire() is True
179
    second_lock.release()
180
181
182
def test_extend(conn):
183
    name = 'foobar'
184
    key_name = 'lock:' + name
185
    with Lock(conn, name, expire=100) as lock:
186
        assert conn.ttl(key_name) <= 100
187
188
        lock.extend(expire=1000)
189
        assert conn.ttl(key_name) > 100
190
191
192
def test_extend_lock_default_expire(conn):
193
    name = 'foobar'
194
    key_name = 'lock:' + name
195
    with Lock(conn, name, expire=1000) as lock:
196
        time.sleep(3)
197
        assert conn.ttl(key_name) <= 997
198
        lock.extend()
199
        assert 997 < conn.ttl(key_name) <= 1000
200
201
202
def test_extend_lock_without_expire_fail(conn):
203
    name = 'foobar'
204
    with Lock(conn, name) as lock:
205
        with pytest.raises(NotExpirable):
206
            lock.extend(expire=1000)
207
208
        with pytest.raises(TypeError):
209
            lock.extend()
210
211
212
def test_extend_another_instance(conn):
213
    """It is possible to extend a lock using another instance of Lock with the
214
    same name.
215
    """
216
    name = 'foobar'
217
    key_name = 'lock:' + name
218
    lock = Lock(conn, name, id='spam', expire=100)
219
    lock.acquire()
220
    assert 0 <= conn.ttl(key_name) <= 100
221
222
    another_lock = Lock(conn, name, id='spam')
223
    another_lock.extend(1000)
224
225
    assert conn.ttl(key_name) > 100
226
227
228
def test_extend_another_instance_different_id_fail(conn):
229
    """It is impossible to extend a lock using another instance of Lock with
230
    the same name, but different id.
231
    """
232
    name = 'foobar'
233
    key_name = 'lock:' + name
234
    lock = Lock(conn, name, expire=100, id='spam')
235
    lock.acquire()
236
    assert 0 <= conn.ttl(key_name) <= 100
237
238
    another_lock = Lock(conn, name, id='eggs')
239
    with pytest.raises(NotAcquired):
240
        another_lock.extend(1000)
241
242
    assert conn.ttl(key_name) <= 100
243
244
245
def test_double_acquire(conn):
246
    lock = Lock(conn, "foobar")
247
    with lock:
248
        pytest.raises(RuntimeError, lock.acquire)
249
        pytest.raises(AlreadyAcquired, lock.acquire)
250
251
252
def test_plain(conn):
253
    with Lock(conn, "foobar"):
254
        time.sleep(0.01)
255
256
257
def test_no_overlap(redis_server):
258
    """
259
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
260
261
    If there would be a bug that would allow two clients to hold the lock at the same time it
262
    would most likely regress this test.
263
264
    The code here mostly tries to parse out the pid of the process and the time when it got and
265
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
266
    got a very bad regression on our hands ...
267
268
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
269
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
270
    """
271
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
272
        with dump_on_error(proc.read):
273
            name = 'lock:foobar'
274
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
275
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
276
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
277
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
278
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
279
280
            class Event(object):
281
                pid = start = end = '?'
282
283
                def __str__(self):
284
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
285
286
            events = defaultdict(Event)
287
            for line in proc.read().splitlines():
288
                try:
289
                    pid, time, junk = line.split(' ', 2)
290
                    pid = int(pid)
291
                except ValueError:
292
                    continue
293
                if 'Got lock for' in junk:
294
                    events[pid].pid = pid
295
                    events[pid].start = time
296
                if 'Releasing' in junk:
297
                    events[pid].pid = pid
298
                    events[pid].end = time
299
            assert len(events) == 125
300
301
            # not very smart but we don't have millions of events so it's
302
            # ok - compare all the events with all the other events:
303
            for event in events.values():
304
                for other in events.values():
305
                    if other is not event:
306
                        try:
307
                            if other.start < event.start < other.end or \
308
                               other.start < event.end < other.end:
309
                                pytest.fail('%s overlaps %s' % (event, other))
310
                        except:
311
                            print("[%s/%s]" % (event, other))
312
                            raise
313
314
315
NWORKERS = 125
316
317
def test_no_overlap2(make_process, make_conn):
318
    """The second version of contention test, that uses multiprocessing."""
319
    go         = multiprocessing.Event()
320
    count_lock = multiprocessing.Lock()
321
    count      = multiprocessing.Value('H', 0)
322
323
    def workerfn(go, count_lock, count):
324
        redis_lock = Lock(make_conn(), 'lock')
325
        with count_lock:
326
            count.value += 1
327
328
        go.wait()
329
330
        if redis_lock.acquire(blocking=True):
331
            with count_lock:
332
                count.value += 1
333
334
    for _ in range(NWORKERS):
335
        make_process(target=workerfn, args=(go, count_lock, count)).start()
336
337
    # Wait until all workers will come to point when they are ready to acquire
338
    # the redis lock.
339
    while count.value < NWORKERS:
340
        time.sleep(0.5)
341
342
    # Then "count" will be used as counter of workers, which acquired
343
    # redis-lock with success.
344
    count.value = 0
345
346
    go.set()
347
348
    time.sleep(1)
349
350
    assert count.value == 1
351
352
353
def test_reset(conn):
354
    with Lock(conn, "foobar") as lock:
355
        lock.reset()
356
        new_lock = Lock(conn, "foobar")
357
        new_lock.acquire(blocking=False)
358
        new_lock.release()
359
360
361
def test_reset_all(conn):
362
    lock1 = Lock(conn, "foobar1")
363
    lock2 = Lock(conn, "foobar2")
364
    lock1.acquire(blocking=False)
365
    lock2.acquire(blocking=False)
366
    reset_all(conn)
367
    lock1 = Lock(conn, "foobar1")
368
    lock2 = Lock(conn, "foobar2")
369
    lock1.acquire(blocking=False)
370
    lock2.acquire(blocking=False)
371
    lock1.release()
372
    lock2.release()
373
374
375
def test_owner_id(conn):
376
    unique_identifier = b"foobar-identifier"
377
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
378
    lock_id = lock.id
379
    assert lock_id == unique_identifier
380
    lock.acquire(blocking=False)
381
    assert lock.get_owner_id() == unique_identifier
382
    lock.release()
383
384
385
def test_token(conn):
386
    lock = Lock(conn, "foobar-tok")
387
    tok = lock.id
388
    assert conn.get(lock._name) is None
389
    lock.acquire(blocking=False)
390
    assert conn.get(lock._name) == tok
391
392
393
def test_bogus_release(conn):
394
    lock = Lock(conn, "foobar-tok")
395
    pytest.raises(NotAcquired, lock.release)
396
    lock.release(force=True)
397
398
399
def test_release_from_nonblocking_leaving_garbage(conn):
400
    for _ in range(10):
401
        lock = Lock(conn, 'release_from_nonblocking')
402
        lock.acquire(blocking=False)
403
        lock.release()
404
        assert conn.llen('lock-signal:release_from_nonblocking') == 1
405
406
407
def test_no_auto_renewal(conn):
408
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
409
    assert lock._lock_renewal_interval is None
410
    lock.acquire()
411
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
412
413
414
def test_auto_renewal_bad_values(conn):
415
    with pytest.raises(ValueError):
416
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
417
418
419
def test_auto_renewal(conn):
420
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
421
    lock.acquire()
422
423
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
424
    assert not lock._lock_renewal_thread.should_exit
425
    assert lock._lock_renewal_interval == 2
426
427
    time.sleep(3)
428
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
429
430
    lock.release()
431
    assert lock._lock_renewal_thread is None
432
433
434
def test_signal_expiration(conn):
435
    """Signal keys expire within two seconds after releasing the lock."""
436
    lock = Lock(conn, 'signal_expiration')
437
    lock.acquire()
438
    lock.release()
439
    time.sleep(2)
440
    assert conn.llen('lock-signal:signal_expiration') == 0
441
442
443
def test_reset_signalizes(make_conn, make_process):
444
    """Call to reset() causes LPUSH to signal key, so blocked waiters
445
    become unblocked."""
446
    def workerfn(unblocked):
447
        conn = make_conn()
448
        lock = Lock(conn, 'lock')
449
        if lock.acquire():
450
            unblocked.value = 1
451
452
    unblocked = multiprocessing.Value('B', 0)
453
    conn = make_conn()
454
    lock = Lock(conn, 'lock')
455
    lock.acquire()
456
457
    worker = make_process(target=workerfn, args=(unblocked,))
458
    worker.start()
459
    worker.join(0.5)
460
    lock.reset()
461
    worker.join(0.5)
462
463
    assert unblocked.value == 1
464
465
466
def test_reset_all_signalizes(make_conn, make_process):
467
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
468
    become unblocked."""
469
    def workerfn(unblocked):
470
        conn = make_conn()
471
        lock1 = Lock(conn, 'lock1')
472
        lock2 = Lock(conn, 'lock2')
473
        if lock1.acquire() and lock2.acquire():
474
            unblocked.value = 1
475
476
    unblocked = multiprocessing.Value('B', 0)
477
    conn = make_conn()
478
    lock1 = Lock(conn, 'lock1')
479
    lock2 = Lock(conn, 'lock2')
480
    lock1.acquire()
481
    lock2.acquire()
482
483
    worker = make_process(target=workerfn, args=(unblocked,))
484
    worker.start()
485
    worker.join(0.5)
486
    reset_all(conn)
487
    worker.join(0.5)
488
489
    assert unblocked.value == 1
490