Completed
Push — master ( 949a86...c5f0ee )
by Ionel Cristian
01:04
created

tests.test_extend_another_instance_different_id_fail()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 15
rs 9.2
1
from __future__ import print_function
2
3
import os
4
import sys
5
import time
6
from collections import defaultdict
7
import multiprocessing
8
9
import pytest
10
from process_tests import TestProcess
11
from process_tests import dump_on_error
12
from process_tests import wait_for_strings
13
14
from redis import StrictRedis
15
16
from redis_lock import AlreadyAcquired
17
from redis_lock import InterruptableThread
18
from redis_lock import InvalidTimeout
19
from redis_lock import Lock
20
from redis_lock import NotAcquired
21
from redis_lock import TimeoutTooLarge
22
from redis_lock import TimeoutNotUsable
23
from redis_lock import NotExpirable
24
from redis_lock import reset_all
25
from conf import HELPER
26
from conf import TIMEOUT
27
from conf import UDS_PATH
28
29
30
@pytest.yield_fixture
31
def redis_server(scope='module'):
32
    try:
33
        os.unlink(UDS_PATH)
34
    except OSError:
35
        pass
36
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
37
        with dump_on_error(process.read):
38
            wait_for_strings(process.read, TIMEOUT, "Running")
39
            yield process
40
41
42
@pytest.fixture(scope='function')
43
def make_conn(request, redis_server):
44
    """Redis connection factory."""
45
    def make_conn_factory():
46
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
47
        request.addfinalizer(conn_.flushdb)
48
49
        return conn_
50
    return make_conn_factory
51
52
53
@pytest.fixture(scope='function')
54
def conn(request, make_conn):
55
    return make_conn()
56
57
58
@pytest.fixture
59
def make_process(request):
60
    """Process factory, that makes processes, that terminate themselves
61
    after a test run.
62
    """
63
    def make_process_factory(*args, **kwargs):
64
        process = multiprocessing.Process(*args, **kwargs)
65
        request.addfinalizer(process.terminate)
66
67
        return process
68
69
    return make_process_factory
70
71
72
def test_simple(redis_server):
73
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
74
        with dump_on_error(proc.read):
75
            name = 'lock:foobar'
76
            wait_for_strings(
77
                proc.read, TIMEOUT,
78
                'Getting %r ...' % name,
79
                'Got lock for %r.' % name,
80
                'Releasing %r.' % name,
81
                'UNLOCK_SCRIPT not cached.',
82
                'DIED.',
83
            )
84
85
86
def test_no_block(conn):
87
    with Lock(conn, "foobar"):
88
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
89
            with dump_on_error(proc.read):
90
                name = 'lock:foobar'
91
                wait_for_strings(
92
                    proc.read, TIMEOUT,
93
                    'Getting %r ...' % name,
94
                    'Failed to get %r.' % name,
95
                    'acquire=>False',
96
                    'DIED.',
97
                )
98
99
100
def test_timeout(conn):
101
    with Lock(conn, "foobar"):
102
        lock = Lock(conn, "foobar")
103
        assert lock.acquire(timeout=1) == False
104
105
106
def test_timeout_expire(conn):
107
    with Lock(conn, "foobar", expire=1):
108
        lock = Lock(conn, "foobar")
109
        assert lock.acquire(timeout=2)
110
111
112
def test_timeout_expire_with_renewal(conn):
113
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
114
        lock = Lock(conn, "foobar")
115
        assert lock.acquire(timeout=2) == False
116
117
118
def test_timeout_acquired(conn):
119
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
120
        with dump_on_error(proc.read):
121
            name = 'lock:foobar'
122
            wait_for_strings(
123
                proc.read, TIMEOUT,
124
                'Getting %r ...' % name,
125
                'Got lock for %r.' % name,
126
            )
127
            lock = Lock(conn, "foobar")
128
            assert lock.acquire(timeout=2)
129
130
131
def test_not_usable_timeout(conn):
132
    lock = Lock(conn, "foobar")
133
    with pytest.raises(TimeoutNotUsable):
134
        lock.acquire(blocking=False, timeout=1)
135
136
137
def test_expire_less_than_timeout(conn):
138
    lock = Lock(conn, "foobar", expire=1)
139
    with pytest.raises(TimeoutTooLarge):
140
        lock.acquire(blocking=True, timeout=2)
141
142
143
def test_invalid_timeout(conn):
144
    lock = Lock(conn, "foobar")
145
    with pytest.raises(InvalidTimeout):
146
        lock.acquire(blocking=True, timeout=0)
147
148
    lock = Lock(conn, "foobar")
149
    with pytest.raises(InvalidTimeout):
150
        lock.acquire(blocking=True, timeout=-1)
151
152
153
def test_expire(conn):
154
    with Lock(conn, "foobar", expire=TIMEOUT/4):
155
        with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
156
            with dump_on_error(proc.read):
157
                name = 'lock:foobar'
158
                wait_for_strings(
159
                    proc.read, TIMEOUT,
160
                    'Getting %r ...' % name,
161
                    'Got lock for %r.' % name,
162
                    'Releasing %r.' % name,
163
                    'UNLOCK_SCRIPT not cached.',
164
                    'DIED.',
165
                )
166
    lock = Lock(conn, "foobar")
167
    try:
168
        assert lock.acquire(blocking=False) == True
169
    finally:
170
        lock.release()
171
172
173
def test_extend(conn):
174
    name = 'foobar'
175
    key_name = 'lock:' + name
176
    with Lock(conn, name, expire=100) as lock:
177
        assert conn.ttl(key_name) <= 100
178
179
        lock.extend(expire=1000)
180
        assert conn.ttl(key_name) > 100
181
182
183
def test_extend_lock_default_expire(conn):
184
    name = 'foobar'
185
    key_name = 'lock:' + name
186
    with Lock(conn, name, expire=1000) as lock:
187
        time.sleep(3)
188
        assert conn.ttl(key_name) <= 997
189
        lock.extend()
190
        assert 997 < conn.ttl(key_name) <= 1000
191
192
193
def test_extend_lock_without_expire_fail(conn):
194
    name = 'foobar'
195
    with Lock(conn, name) as lock:
196
        with pytest.raises(NotExpirable):
197
            lock.extend(expire=1000)
198
199
        with pytest.raises(TypeError):
200
            lock.extend()
201
202
203
def test_extend_another_instance(conn):
204
    """It is possible to extend a lock using another instance of Lock with the
205
    same name.
206
    """
207
    name = 'foobar'
208
    key_name = 'lock:' + name
209
    lock = Lock(conn, name, id='spam', expire=100)
210
    lock.acquire()
211
    assert 0 <= conn.ttl(key_name) <= 100
212
213
    another_lock = Lock(conn, name, id='spam')
214
    another_lock.extend(1000)
215
216
    assert conn.ttl(key_name) > 100
217
218
219
def test_extend_another_instance_different_id_fail(conn):
220
    """It is impossible to extend a lock using another instance of Lock with
221
    the same name, but different id.
222
    """
223
    name = 'foobar'
224
    key_name = 'lock:' + name
225
    lock = Lock(conn, name, expire=100, id='spam')
226
    lock.acquire()
227
    assert 0 <= conn.ttl(key_name) <= 100
228
229
    another_lock = Lock(conn, name, id='eggs')
230
    with pytest.raises(NotAcquired):
231
        another_lock.extend(1000)
232
233
    assert conn.ttl(key_name) <= 100
234
235
236
def test_double_acquire(conn):
237
    lock = Lock(conn, "foobar")
238
    with lock:
239
        pytest.raises(RuntimeError, lock.acquire)
240
        pytest.raises(AlreadyAcquired, lock.acquire)
241
242
243
def test_plain(conn):
244
    with Lock(conn, "foobar"):
245
        time.sleep(0.01)
246
247
248
def test_no_overlap(redis_server):
249
    """
250
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
251
252
    If there would be a bug that would allow two clients to hold the lock at the same time it
253
    would most likely regress this test.
254
255
    The code here mostly tries to parse out the pid of the process and the time when it got and
256
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
257
    got a very bad regression on our hands ...
258
259
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
260
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
261
    """
262
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
263
        with dump_on_error(proc.read):
264
            name = 'lock:foobar'
265
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
266
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
267
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
268
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
269
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
270
271
            class Event(object):
272
                pid = start = end = '?'
273
274
                def __str__(self):
275
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
276
277
            events = defaultdict(Event)
278
            for line in proc.read().splitlines():
279
                try:
280
                    pid, time, junk = line.split(' ', 2)
281
                    pid = int(pid)
282
                except ValueError:
283
                    continue
284
                if 'Got lock for' in junk:
285
                    events[pid].pid = pid
286
                    events[pid].start = time
287
                if 'Releasing' in junk:
288
                    events[pid].pid = pid
289
                    events[pid].end = time
290
            assert len(events) == 125
291
292
            # not very smart but we don't have millions of events so it's
293
            # ok - compare all the events with all the other events:
294
            for event in events.values():
295
                for other in events.values():
296
                    if other is not event:
297
                        try:
298
                            if other.start < event.start < other.end or \
299
                               other.start < event.end < other.end:
300
                                pytest.fail('%s overlaps %s' % (event, other))
301
                        except:
302
                            print("[%s/%s]" % (event, other))
303
                            raise
304
305
306
NWORKERS = 125
307
308
def test_no_overlap2(make_process, make_conn):
309
    """The second version of contention test, that uses multiprocessing."""
310
    go         = multiprocessing.Event()
311
    count_lock = multiprocessing.Lock()
312
    count      = multiprocessing.Value('H', 0)
313
314
    def workerfn(go, count_lock, count):
315
        redis_lock = Lock(make_conn(), 'lock')
316
        with count_lock:
317
            count.value += 1
318
319
        go.wait()
320
321
        if redis_lock.acquire(blocking=True):
322
            with count_lock:
323
                count.value += 1
324
325
    for _ in range(NWORKERS):
326
        make_process(target=workerfn, args=(go, count_lock, count)).start()
327
328
    # Wait until all workers will come to point when they are ready to acquire
329
    # the redis lock.
330
    while count.value < NWORKERS:
331
        time.sleep(0.5)
332
333
    # Then "count" will be used as counter of workers, which acquired
334
    # redis-lock with success.
335
    count.value = 0
336
337
    go.set()
338
339
    time.sleep(1)
340
341
    assert count.value == 1
342
343
344
def test_reset(conn):
345
    with Lock(conn, "foobar") as lock:
346
        lock.reset()
347
        new_lock = Lock(conn, "foobar")
348
        new_lock.acquire(blocking=False)
349
        new_lock.release()
350
351
352
def test_reset_all(conn):
353
    lock1 = Lock(conn, "foobar1")
354
    lock2 = Lock(conn, "foobar2")
355
    lock1.acquire(blocking=False)
356
    lock2.acquire(blocking=False)
357
    reset_all(conn)
358
    lock1 = Lock(conn, "foobar1")
359
    lock2 = Lock(conn, "foobar2")
360
    lock1.acquire(blocking=False)
361
    lock2.acquire(blocking=False)
362
    lock1.release()
363
    lock2.release()
364
365
366
def test_owner_id(conn):
367
    unique_identifier = b"foobar-identifier"
368
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
369
    lock_id = lock.id
370
    assert lock_id == unique_identifier
371
    lock.acquire(blocking=False)
372
    assert lock.get_owner_id() == unique_identifier
373
    lock.release()
374
375
376
def test_token(conn):
377
    lock = Lock(conn, "foobar-tok")
378
    tok = lock.id
379
    assert conn.get(lock._name) is None
380
    lock.acquire(blocking=False)
381
    assert conn.get(lock._name) == tok
382
383
384
def test_bogus_release(conn):
385
    lock = Lock(conn, "foobar-tok")
386
    pytest.raises(NotAcquired, lock.release)
387
    lock.release(force=True)
388
389
390
def test_release_from_nonblocking_leaving_garbage(conn):
391
    for _ in range(10):
392
        lock = Lock(conn, 'release_from_nonblocking')
393
        lock.acquire(blocking=False)
394
        lock.release()
395
        assert conn.llen('lock-signal:release_from_nonblocking') == 1
396
397
398
def test_no_auto_renewal(conn):
399
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
400
    assert lock._lock_renewal_interval is None
401
    lock.acquire()
402
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
403
404
405
def test_auto_renewal_bad_values(conn):
406
    with pytest.raises(ValueError):
407
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
408
409
410
def test_auto_renewal(conn):
411
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
412
    lock.acquire()
413
414
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
415
    assert not lock._lock_renewal_thread.should_exit
416
    assert lock._lock_renewal_interval == 2
417
418
    time.sleep(3)
419
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
420
421
    lock.release()
422
    assert lock._lock_renewal_thread is None
423
424
425
def test_signal_expiration(conn):
426
    """Signal keys expire within two seconds after releasing the lock."""
427
    lock = Lock(conn, 'signal_expiration')
428
    lock.acquire()
429
    lock.release()
430
    time.sleep(2)
431
    assert conn.llen('lock-signal:signal_expiration') == 0
432
433
434
def test_reset_signalizes(make_conn, make_process):
435
    """Call to reset() causes LPUSH to signal key, so blocked waiters
436
    become unblocked."""
437
    def workerfn(unblocked):
438
        conn = make_conn()
439
        lock = Lock(conn, 'lock')
440
        if lock.acquire():
441
            unblocked.value = 1
442
443
    unblocked = multiprocessing.Value('B', 0)
444
    conn = make_conn()
445
    lock = Lock(conn, 'lock')
446
    lock.acquire()
447
448
    worker = make_process(target=workerfn, args=(unblocked,))
449
    worker.start()
450
    worker.join(0.5)
451
    lock.reset()
452
    worker.join(0.5)
453
454
    assert unblocked.value == 1
455
456
457
def test_reset_all_signalizes(make_conn, make_process):
458
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
459
    become unblocked."""
460
    def workerfn(unblocked):
461
        conn = make_conn()
462
        lock1 = Lock(conn, 'lock1')
463
        lock2 = Lock(conn, 'lock2')
464
        if lock1.acquire() and lock2.acquire():
465
            unblocked.value = 1
466
467
    unblocked = multiprocessing.Value('B', 0)
468
    conn = make_conn()
469
    lock1 = Lock(conn, 'lock1')
470
    lock2 = Lock(conn, 'lock2')
471
    lock1.acquire()
472
    lock2.acquire()
473
474
    worker = make_process(target=workerfn, args=(unblocked,))
475
    worker.start()
476
    worker.join(0.5)
477
    reset_all(conn)
478
    worker.join(0.5)
479
480
    assert unblocked.value == 1
481