Completed
Push — master ( 15749c...80666e )
by Ionel Cristian
58s
created

tests.test_get_owner_id()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 5
rs 9.4286
1
from __future__ import print_function, division
2
3
import os
4
import platform
5
import sys
6
import time
7
from collections import defaultdict
8
import multiprocessing
9
10
import pytest
11
from process_tests import TestProcess
12
from process_tests import dump_on_error
13
from process_tests import wait_for_strings
14
15
from redis import StrictRedis
16
17
from redis_lock import AlreadyAcquired
18
from redis_lock import InterruptableThread
19
from redis_lock import InvalidTimeout
20
from redis_lock import Lock
21
from redis_lock import NotAcquired
22
from redis_lock import TimeoutTooLarge
23
from redis_lock import TimeoutNotUsable
24
from redis_lock import NotExpirable
25
from redis_lock import reset_all
26
from conf import HELPER
27
from conf import TIMEOUT
28
from conf import UDS_PATH
29
30
31
@pytest.yield_fixture
32
def redis_server(scope='module'):
33
    try:
34
        os.unlink(UDS_PATH)
35
    except OSError:
36
        pass
37
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
38
        with dump_on_error(process.read):
39
            wait_for_strings(process.read, TIMEOUT, "Running")
40
            yield process
41
42
43
@pytest.fixture(scope='function')
44
def make_conn(request, redis_server):
45
    """Redis connection factory."""
46
    def make_conn_factory():
47
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
48
        request.addfinalizer(conn_.flushdb)
49
50
        return conn_
51
    return make_conn_factory
52
53
54
@pytest.fixture(scope='function')
55
def conn(request, make_conn):
56
    return make_conn()
57
58
59
@pytest.fixture
60
def make_process(request):
61
    """Process factory, that makes processes, that terminate themselves
62
    after a test run.
63
    """
64
    def make_process_factory(*args, **kwargs):
65
        process = multiprocessing.Process(*args, **kwargs)
66
        request.addfinalizer(process.terminate)
67
68
        return process
69
70
    return make_process_factory
71
72
73
def test_simple(redis_server):
74
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
75
        with dump_on_error(proc.read):
76
            name = 'lock:foobar'
77
            wait_for_strings(
78
                proc.read, TIMEOUT,
79
                'Getting %r ...' % name,
80
                'Got lock for %r.' % name,
81
                'Releasing %r.' % name,
82
                'UNLOCK_SCRIPT not cached.',
83
                'DIED.',
84
            )
85
86
87
def test_no_block(conn):
88
    with Lock(conn, "foobar"):
89
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
90
            with dump_on_error(proc.read):
91
                name = 'lock:foobar'
92
                wait_for_strings(
93
                    proc.read, TIMEOUT,
94
                    'Getting %r ...' % name,
95
                    'Failed to get %r.' % name,
96
                    'acquire=>False',
97
                    'DIED.',
98
                )
99
100
101
def test_timeout(conn):
102
    with Lock(conn, "foobar"):
103
        lock = Lock(conn, "foobar")
104
        assert lock.acquire(timeout=1) == False
105
106
107
def test_timeout_expire(conn):
108
    lock1 = Lock(conn, "foobar", expire=1)
109
    lock1.acquire()
110
    lock2 = Lock(conn, "foobar")
111
    assert lock2.acquire(timeout=2)
112
113
114
def test_timeout_expire_with_renewal(conn):
115
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
116
        lock = Lock(conn, "foobar")
117
        assert lock.acquire(timeout=2) == False
118
119
120
def test_timeout_acquired(conn):
121
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
122
        with dump_on_error(proc.read):
123
            name = 'lock:foobar'
124
            wait_for_strings(
125
                proc.read, TIMEOUT,
126
                'Getting %r ...' % name,
127
                'Got lock for %r.' % name,
128
            )
129
            lock = Lock(conn, "foobar")
130
            assert lock.acquire(timeout=2)
131
132
133
def test_not_usable_timeout(conn):
134
    lock = Lock(conn, "foobar")
135
    with pytest.raises(TimeoutNotUsable):
136
        lock.acquire(blocking=False, timeout=1)
137
138
139
def test_expire_less_than_timeout(conn):
140
    lock = Lock(conn, "foobar", expire=1)
141
    with pytest.raises(TimeoutTooLarge):
142
        lock.acquire(blocking=True, timeout=2)
143
144
145
def test_invalid_timeout(conn):
146
    lock = Lock(conn, "foobar")
147
    with pytest.raises(InvalidTimeout):
148
        lock.acquire(blocking=True, timeout=0)
149
150
    lock = Lock(conn, "foobar")
151
    with pytest.raises(InvalidTimeout):
152
        lock.acquire(blocking=True, timeout=-1)
153
154
155
def test_expire(conn):
156
    lock = Lock(conn, "foobar", expire=TIMEOUT/4)
157
    lock.acquire()
158
    with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
159
        with dump_on_error(proc.read):
160
            name = 'lock:foobar'
161
            wait_for_strings(
162
                proc.read, TIMEOUT,
163
                'Getting %r ...' % name,
164
                'Got lock for %r.' % name,
165
                'Releasing %r.' % name,
166
                'UNLOCK_SCRIPT not cached.',
167
                'DIED.',
168
            )
169
    lock = Lock(conn, "foobar")
170
    try:
171
        assert lock.acquire(blocking=False) == True
172
    finally:
173
        lock.release()
174
175
176
def test_expire_without_timeout(conn):
177
    first_lock = Lock(conn, 'expire', expire=2)
178
    second_lock = Lock(conn, 'expire', expire=1)
179
    first_lock.acquire()
180
    assert second_lock.acquire(blocking=False) is False
181
    assert second_lock.acquire() is True
182
    second_lock.release()
183
184
185
def test_extend(conn):
186
    name = 'foobar'
187
    key_name = 'lock:' + name
188
    with Lock(conn, name, expire=100) as lock:
189
        assert conn.ttl(key_name) <= 100
190
191
        lock.extend(expire=1000)
192
        assert conn.ttl(key_name) > 100
193
194
195
def test_extend_lock_default_expire(conn):
196
    name = 'foobar'
197
    key_name = 'lock:' + name
198
    with Lock(conn, name, expire=1000) as lock:
199
        time.sleep(3)
200
        assert conn.ttl(key_name) <= 997
201
        lock.extend()
202
        assert 997 < conn.ttl(key_name) <= 1000
203
204
205
def test_extend_lock_without_expire_fail(conn):
206
    name = 'foobar'
207
    with Lock(conn, name) as lock:
208
        with pytest.raises(NotExpirable):
209
            lock.extend(expire=1000)
210
211
        with pytest.raises(TypeError):
212
            lock.extend()
213
214
215
def test_extend_another_instance(conn):
216
    """It is possible to extend a lock using another instance of Lock with the
217
    same name.
218
    """
219
    name = 'foobar'
220
    key_name = 'lock:' + name
221
    lock = Lock(conn, name, expire=100)
222
    lock.acquire()
223
    assert 0 <= conn.ttl(key_name) <= 100
224
225
    another_lock = Lock(conn, name, id=lock.id)
226
    another_lock.extend(1000)
227
228
    assert conn.ttl(key_name) > 100
229
230
231
def test_extend_another_instance_different_id_fail(conn):
232
    """It is impossible to extend a lock using another instance of Lock with
233
    the same name, but different id.
234
    """
235
    name = 'foobar'
236
    key_name = 'lock:' + name
237
    lock = Lock(conn, name, expire=100)
238
    lock.acquire()
239
    assert 0 <= conn.ttl(key_name) <= 100
240
241
    another_lock = Lock(conn, name)
242
    with pytest.raises(NotAcquired):
243
        another_lock.extend(1000)
244
245
    assert conn.ttl(key_name) <= 100
246
    assert lock.id != another_lock.id
247
248
249
def test_double_acquire(conn):
250
    lock = Lock(conn, "foobar")
251
    with lock:
252
        pytest.raises(RuntimeError, lock.acquire)
253
        pytest.raises(AlreadyAcquired, lock.acquire)
254
255
256
def test_plain(conn):
257
    with Lock(conn, "foobar"):
258
        time.sleep(0.01)
259
260
261
def test_no_overlap(redis_server):
262
    """
263
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
264
265
    If there would be a bug that would allow two clients to hold the lock at the same time it
266
    would most likely regress this test.
267
268
    The code here mostly tries to parse out the pid of the process and the time when it got and
269
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
270
    got a very bad regression on our hands ...
271
272
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
273
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
274
    """
275
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
276
        with dump_on_error(proc.read):
277
            name = 'lock:foobar'
278
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
279
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
280
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
281
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
282
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
283
284
            class Event(object):
285
                pid = start = end = '?'
286
287
                def __str__(self):
288
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
289
290
            events = defaultdict(Event)
291
            for line in proc.read().splitlines():
292
                try:
293
                    pid, time, junk = line.split(' ', 2)
294
                    pid = int(pid)
295
                except ValueError:
296
                    continue
297
                if 'Got lock for' in junk:
298
                    events[pid].pid = pid
299
                    events[pid].start = time
300
                if 'Releasing' in junk:
301
                    events[pid].pid = pid
302
                    events[pid].end = time
303
            assert len(events) == 125
304
305
            # not very smart but we don't have millions of events so it's
306
            # ok - compare all the events with all the other events:
307
            for event in events.values():
308
                for other in events.values():
309
                    if other is not event:
310
                        try:
311
                            if other.start < event.start < other.end or \
312
                               other.start < event.end < other.end:
313
                                pytest.fail('%s overlaps %s' % (event, other))
314
                        except:
315
                            print("[%s/%s]" % (event, other))
316
                            raise
317
318
319
NWORKERS = 125
320
321
@pytest.mark.skipif(platform.python_implementation() == 'PyPy', reason="This appears to be way too slow to run on PyPy")
322
def test_no_overlap2(make_process, make_conn):
323
    """The second version of contention test, that uses multiprocessing."""
324
    go         = multiprocessing.Event()
325
    count_lock = multiprocessing.Lock()
326
    count      = multiprocessing.Value('H', 0)
327
328
    def workerfn(go, count_lock, count):
329
        redis_lock = Lock(make_conn(), 'lock')
330
        with count_lock:
331
            count.value += 1
332
333
        go.wait()
334
335
        if redis_lock.acquire(blocking=True):
336
            with count_lock:
337
                count.value += 1
338
339
    for _ in range(NWORKERS):
340
        make_process(target=workerfn, args=(go, count_lock, count)).start()
341
342
    # Wait until all workers will come to point when they are ready to acquire
343
    # the redis lock.
344
    while count.value < NWORKERS:
345
        time.sleep(0.5)
346
347
    # Then "count" will be used as counter of workers, which acquired
348
    # redis-lock with success.
349
    count.value = 0
350
351
    go.set()
352
353
    time.sleep(1)
354
355
    assert count.value == 1
356
357
358
def test_reset(conn):
359
    lock = Lock(conn, "foobar")
360
    lock.reset()
361
    new_lock = Lock(conn, "foobar")
362
    new_lock.acquire(blocking=False)
363
    new_lock.release()
364
    pytest.raises(NotAcquired, lock.release)
365
366
367
def test_reset_all(conn):
368
    lock1 = Lock(conn, "foobar1")
369
    lock2 = Lock(conn, "foobar2")
370
    lock1.acquire(blocking=False)
371
    lock2.acquire(blocking=False)
372
    reset_all(conn)
373
    lock1 = Lock(conn, "foobar1")
374
    lock2 = Lock(conn, "foobar2")
375
    lock1.acquire(blocking=False)
376
    lock2.acquire(blocking=False)
377
    lock1.release()
378
    lock2.release()
379
380
381
def test_owner_id(conn):
382
    unique_identifier = b"foobar-identifier"
383
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
384
    lock_id = lock.id
385
    assert lock_id == unique_identifier
386
387
388
def test_get_owner_id(conn):
389
    lock = Lock(conn, "foobar-tok")
390
    lock.acquire()
391
    assert lock.get_owner_id() == lock.id
392
    lock.release()
393
394
395
def test_token(conn):
396
    lock = Lock(conn, "foobar-tok")
397
    tok = lock.id
398
    assert conn.get(lock._name) is None
399
    lock.acquire(blocking=False)
400
    assert conn.get(lock._name) == tok
401
402
403
def test_bogus_release(conn):
404
    lock = Lock(conn, "foobar-tok")
405
    pytest.raises(NotAcquired, lock.release)
406
    lock.acquire()
407
    lock2 = Lock(conn, "foobar-tok", id=lock.id)
408
    lock2.release()
409
410
411
def test_release_from_nonblocking_leaving_garbage(conn):
412
    for _ in range(10):
413
        lock = Lock(conn, 'release_from_nonblocking')
414
        lock.acquire(blocking=False)
415
        lock.release()
416
        assert conn.llen('lock-signal:release_from_nonblocking') == 1
417
418
419
def test_no_auto_renewal(conn):
420
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
421
    assert lock._lock_renewal_interval is None
422
    lock.acquire()
423
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
424
425
426
def test_auto_renewal_bad_values(conn):
427
    with pytest.raises(ValueError):
428
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
429
430
431
def test_auto_renewal(conn):
432
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
433
    lock.acquire()
434
435
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
436
    assert not lock._lock_renewal_thread.should_exit
437
    assert lock._lock_renewal_interval == 2
438
439
    time.sleep(3)
440
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
441
442
    lock.release()
443
    assert lock._lock_renewal_thread is None
444
445
446
def test_signal_expiration(conn):
447
    """Signal keys expire within two seconds after releasing the lock."""
448
    lock = Lock(conn, 'signal_expiration')
449
    lock.acquire()
450
    lock.release()
451
    time.sleep(2)
452
    assert conn.llen('lock-signal:signal_expiration') == 0
453
454
455
def test_reset_signalizes(make_conn, make_process):
456
    """Call to reset() causes LPUSH to signal key, so blocked waiters
457
    become unblocked."""
458
    def workerfn(unblocked):
459
        conn = make_conn()
460
        lock = Lock(conn, 'lock')
461
        if lock.acquire():
462
            unblocked.value = 1
463
464
    unblocked = multiprocessing.Value('B', 0)
465
    conn = make_conn()
466
    lock = Lock(conn, 'lock')
467
    lock.acquire()
468
469
    worker = make_process(target=workerfn, args=(unblocked,))
470
    worker.start()
471
    worker.join(0.5)
472
    lock.reset()
473
    worker.join(0.5)
474
475
    assert unblocked.value == 1
476
477
478
def test_reset_all_signalizes(make_conn, make_process):
479
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
480
    become unblocked."""
481
    def workerfn(unblocked):
482
        conn = make_conn()
483
        lock1 = Lock(conn, 'lock1')
484
        lock2 = Lock(conn, 'lock2')
485
        if lock1.acquire() and lock2.acquire():
486
            unblocked.value = 1
487
488
    unblocked = multiprocessing.Value('B', 0)
489
    conn = make_conn()
490
    lock1 = Lock(conn, 'lock1')
491
    lock2 = Lock(conn, 'lock2')
492
    lock1.acquire()
493
    lock2.acquire()
494
495
    worker = make_process(target=workerfn, args=(unblocked,))
496
    worker.start()
497
    worker.join(0.5)
498
    reset_all(conn)
499
    worker.join(0.5)
500
501
    assert unblocked.value == 1
502