Completed
Push — master ( c5f31b...4822be )
by Ionel Cristian
03:52
created

tests.test_reset_all_signalizes()   B

Complexity

Conditions 5

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 24
rs 8.1672
1
from __future__ import print_function
2
3
import os
4
import sys
5
import time
6
from collections import defaultdict
7
import multiprocessing
8
9
import pytest
10
from process_tests import TestProcess
11
from process_tests import dump_on_error
12
from process_tests import wait_for_strings
13
14
from redis import StrictRedis
15
16
from redis_lock import AlreadyAcquired
17
from redis_lock import InterruptableThread
18
from redis_lock import InvalidTimeout
19
from redis_lock import Lock
20
from redis_lock import NotAcquired
21
from redis_lock import TimeoutTooLarge
22
from redis_lock import TimeoutNotUsable
23
from redis_lock import NotExpirable
24
from redis_lock import reset_all
25
from conf import HELPER
26
from conf import TIMEOUT
27
from conf import UDS_PATH
28
29
30
@pytest.yield_fixture
31
def redis_server(scope='module'):
32
    try:
33
        os.unlink(UDS_PATH)
34
    except OSError:
35
        pass
36
    with TestProcess('redis-server', '--port', '0', '--unixsocket', UDS_PATH) as process:
37
        with dump_on_error(process.read):
38
            wait_for_strings(process.read, TIMEOUT, "Running")
39
            yield process
40
41
42
@pytest.fixture(scope='function')
43
def make_conn(request, redis_server):
44
    """Redis connection factory."""
45
    def make_conn_factory():
46
        conn_ = StrictRedis(unix_socket_path=UDS_PATH)
47
        request.addfinalizer(conn_.flushdb)
48
49
        return conn_
50
    return make_conn_factory
51
52
53
@pytest.fixture(scope='function')
54
def conn(request, make_conn):
55
    return make_conn()
56
57
58
@pytest.fixture
59
def make_process(request):
60
    """Process factory, that makes processes, that terminate themselves
61
    after a test run.
62
    """
63
    def make_process_factory(*args, **kwargs):
64
        process = multiprocessing.Process(*args, **kwargs)
65
        request.addfinalizer(process.terminate)
66
67
        return process
68
69
    return make_process_factory
70
71
72
def test_simple(redis_server):
73
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
74
        with dump_on_error(proc.read):
75
            name = 'lock:foobar'
76
            wait_for_strings(
77
                proc.read, TIMEOUT,
78
                'Getting %r ...' % name,
79
                'Got lock for %r.' % name,
80
                'Releasing %r.' % name,
81
                'UNLOCK_SCRIPT not cached.',
82
                'DIED.',
83
            )
84
85
86
def test_no_block(conn):
87
    with Lock(conn, "foobar"):
88
        with TestProcess(sys.executable, HELPER, 'test_no_block') as proc:
89
            with dump_on_error(proc.read):
90
                name = 'lock:foobar'
91
                wait_for_strings(
92
                    proc.read, TIMEOUT,
93
                    'Getting %r ...' % name,
94
                    'Failed to get %r.' % name,
95
                    'acquire=>False',
96
                    'DIED.',
97
                )
98
99
100
def test_timeout(conn):
101
    with Lock(conn, "foobar"):
102
        lock = Lock(conn, "foobar")
103
        assert lock.acquire(timeout=1) == False
104
105
106
def test_timeout_expire(conn):
107
    with Lock(conn, "foobar", expire=1):
108
        lock = Lock(conn, "foobar")
109
        assert lock.acquire(timeout=2)
110
111
112
def test_timeout_expire_with_renewal(conn):
113
    with Lock(conn, "foobar", expire=1, auto_renewal=True):
114
        lock = Lock(conn, "foobar")
115
        assert lock.acquire(timeout=2) == False
116
117
118
def test_timeout_acquired(conn):
119
    with TestProcess(sys.executable, HELPER, 'test_timeout') as proc:
120
        with dump_on_error(proc.read):
121
            name = 'lock:foobar'
122
            wait_for_strings(
123
                proc.read, TIMEOUT,
124
                'Getting %r ...' % name,
125
                'Got lock for %r.' % name,
126
            )
127
            lock = Lock(conn, "foobar")
128
            assert lock.acquire(timeout=2)
129
130
131
def test_not_usable_timeout(conn):
132
    lock = Lock(conn, "foobar")
133
    with pytest.raises(TimeoutNotUsable):
134
        lock.acquire(blocking=False, timeout=1)
135
136
137
def test_expire_less_than_timeout(conn):
138
    lock = Lock(conn, "foobar", expire=1)
139
    with pytest.raises(TimeoutTooLarge):
140
        lock.acquire(blocking=True, timeout=2)
141
142
143
def test_invalid_timeout(conn):
144
    lock = Lock(conn, "foobar")
145
    with pytest.raises(InvalidTimeout):
146
        lock.acquire(blocking=True, timeout=0)
147
148
    lock = Lock(conn, "foobar")
149
    with pytest.raises(InvalidTimeout):
150
        lock.acquire(blocking=True, timeout=-1)
151
152
153
def test_expire(conn):
154
    with Lock(conn, "foobar", expire=TIMEOUT/4):
155
        with TestProcess(sys.executable, HELPER, 'test_expire') as proc:
156
            with dump_on_error(proc.read):
157
                name = 'lock:foobar'
158
                wait_for_strings(
159
                    proc.read, TIMEOUT,
160
                    'Getting %r ...' % name,
161
                    'Got lock for %r.' % name,
162
                    'Releasing %r.' % name,
163
                    'UNLOCK_SCRIPT not cached.',
164
                    'DIED.',
165
                )
166
    lock = Lock(conn, "foobar")
167
    try:
168
        assert lock.acquire(blocking=False) == True
169
    finally:
170
        lock.release()
171
172
173
def test_extend(conn):
174
    name = 'foobar'
175
    key_name = 'lock:' + name
176
    with Lock(conn, name, expire=100) as lock:
177
        assert conn.ttl(key_name) <= 100
178
179
        lock.extend(expire=1000)
180
        assert conn.ttl(key_name) > 100
181
182
183
def test_extend_lock_default_expire(conn):
184
    name = 'foobar'
185
    key_name = 'lock:' + name
186
    with Lock(conn, name, expire=1000) as lock:
187
        time.sleep(3)
188
        assert conn.ttl(key_name) <= 997
189
        lock.extend()
190
        assert 997 < conn.ttl(key_name) <= 1000
191
192
193
def test_extend_lock_without_expire_fail(conn):
194
    name = 'foobar'
195
    with Lock(conn, name) as lock:
196
        with pytest.raises(NotExpirable):
197
            lock.extend(expire=1000)
198
199
        with pytest.raises(NotExpirable):
200
            lock.extend()
201
202
203
def test_double_acquire(conn):
204
    lock = Lock(conn, "foobar")
205
    with lock:
206
        pytest.raises(RuntimeError, lock.acquire)
207
        pytest.raises(AlreadyAcquired, lock.acquire)
208
209
210
def test_plain(conn):
211
    with Lock(conn, "foobar"):
212
        time.sleep(0.01)
213
214
215
def test_no_overlap(redis_server):
216
    """
217
    This test tries to simulate contention: lots of clients trying to acquire at the same time.
218
219
    If there would be a bug that would allow two clients to hold the lock at the same time it
220
    would most likely regress this test.
221
222
    The code here mostly tries to parse out the pid of the process and the time when it got and
223
    released the lock. If there's is overlap (eg: pid1.start < pid2.start < pid1.end) then we
224
    got a very bad regression on our hands ...
225
226
    The subprocess being run (check helper.py) will fork bunch of processes and will try to
227
    syncronize them (using the builting sched) to try to acquire the lock at the same time.
228
    """
229
    with TestProcess(sys.executable, HELPER, 'test_no_overlap') as proc:
230
        with dump_on_error(proc.read):
231
            name = 'lock:foobar'
232
            wait_for_strings(proc.read, 10*TIMEOUT, 'Getting %r ...' % name)
233
            wait_for_strings(proc.read, 10*TIMEOUT, 'Got lock for %r.' % name)
234
            wait_for_strings(proc.read, 10*TIMEOUT, 'Releasing %r.' % name)
235
            wait_for_strings(proc.read, 10*TIMEOUT, 'UNLOCK_SCRIPT not cached.')
236
            wait_for_strings(proc.read, 10*TIMEOUT, 'DIED.')
237
238
            class Event(object):
239
                pid = start = end = '?'
240
241
                def __str__(self):
242
                    return "Event(%s; %r => %r)" % (self.pid, self.start, self.end)
243
244
            events = defaultdict(Event)
245
            for line in proc.read().splitlines():
246
                try:
247
                    pid, time, junk = line.split(' ', 2)
248
                    pid = int(pid)
249
                except ValueError:
250
                    continue
251
                if 'Got lock for' in junk:
252
                    events[pid].pid = pid
253
                    events[pid].start = time
254
                if 'Releasing' in junk:
255
                    events[pid].pid = pid
256
                    events[pid].end = time
257
            assert len(events) == 125
258
259
            # not very smart but we don't have millions of events so it's
260
            # ok - compare all the events with all the other events:
261
            for event in events.values():
262
                for other in events.values():
263
                    if other is not event:
264
                        try:
265
                            if other.start < event.start < other.end or \
266
                               other.start < event.end < other.end:
267
                                pytest.fail('%s overlaps %s' % (event, other))
268
                        except:
269
                            print("[%s/%s]" % (event, other))
270
                            raise
271
272
273
NWORKERS = 125
274
275
def test_no_overlap2(make_process, make_conn):
276
    """The second version of contention test, that uses multiprocessing."""
277
    go         = multiprocessing.Event()
278
    count_lock = multiprocessing.Lock()
279
    count      = multiprocessing.Value('H', 0)
280
281
    def workerfn(go, count_lock, count):
282
        redis_lock = Lock(make_conn(), 'lock')
283
        with count_lock:
284
            count.value += 1
285
286
        go.wait()
287
288
        if redis_lock.acquire(blocking=True):
289
            with count_lock:
290
                count.value += 1
291
292
    for _ in range(NWORKERS):
293
        make_process(target=workerfn, args=(go, count_lock, count)).start()
294
295
    # Wait until all workers will come to point when they are ready to acquire
296
    # the redis lock.
297
    while count.value < NWORKERS:
298
        time.sleep(0.05)
299
300
    # Then "count" will be used as counter of workers, which acquired
301
    # redis-lock with success.
302
    count.value = 0
303
304
    go.set()
305
306
    time.sleep(0.5)
307
308
    assert count.value == 1
309
310
311
def test_reset(conn):
312
    with Lock(conn, "foobar") as lock:
313
        lock.reset()
314
        new_lock = Lock(conn, "foobar")
315
        new_lock.acquire(blocking=False)
316
        new_lock.release()
317
318
319
def test_reset_all(conn):
320
    lock1 = Lock(conn, "foobar1")
321
    lock2 = Lock(conn, "foobar2")
322
    lock1.acquire(blocking=False)
323
    lock2.acquire(blocking=False)
324
    reset_all(conn)
325
    lock1 = Lock(conn, "foobar1")
326
    lock2 = Lock(conn, "foobar2")
327
    lock1.acquire(blocking=False)
328
    lock2.acquire(blocking=False)
329
    lock1.release()
330
    lock2.release()
331
332
333
def test_owner_id(conn):
334
    unique_identifier = b"foobar-identifier"
335
    lock = Lock(conn, "foobar-tok", expire=TIMEOUT/4, id=unique_identifier)
336
    lock_id = lock.id
337
    assert lock_id == unique_identifier
338
    lock.acquire(blocking=False)
339
    assert lock.get_owner_id() == unique_identifier
340
    lock.release()
341
342
343
def test_token(conn):
344
    lock = Lock(conn, "foobar-tok")
345
    tok = lock.id
346
    assert conn.get(lock._name) is None
347
    lock.acquire(blocking=False)
348
    assert conn.get(lock._name) == tok
349
350
351
def test_bogus_release(conn):
352
    lock = Lock(conn, "foobar-tok")
353
    pytest.raises(NotAcquired, lock.release)
354
    lock.release(force=True)
355
356
357
def test_release_from_nonblocking_leaving_garbage(conn):
358
    for _ in range(10):
359
        lock = Lock(conn, 'release_from_nonblocking')
360
        lock.acquire(blocking=False)
361
        lock.release()
362
        assert conn.llen('lock-signal:release_from_nonblocking') == 1
363
364
365
def test_no_auto_renewal(conn):
366
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=False)
367
    assert lock._lock_renewal_interval is None
368
    lock.acquire()
369
    assert lock._lock_renewal_thread is None, "No lock refresh thread should have been spawned"
370
371
372
def test_auto_renewal_bad_values(conn):
373
    with pytest.raises(ValueError):
374
        Lock(conn, 'lock_renewal', expire=None, auto_renewal=True)
375
376
377
def test_auto_renewal(conn):
378
    lock = Lock(conn, 'lock_renewal', expire=3, auto_renewal=True)
379
    lock.acquire()
380
381
    assert isinstance(lock._lock_renewal_thread, InterruptableThread)
382
    assert not lock._lock_renewal_thread.should_exit
383
    assert lock._lock_renewal_interval == 2
384
385
    time.sleep(3)
386
    assert conn.get(lock._name) == lock.id, "Key expired but it should have been getting renewed"
387
388
    lock.release()
389
    assert lock._lock_renewal_thread is None
390
391
392
def test_signal_expiration(conn):
393
    """Signal keys expire within two seconds after releasing the lock."""
394
    lock = Lock(conn, 'signal_expiration')
395
    lock.acquire()
396
    lock.release()
397
    time.sleep(2)
398
    assert conn.llen('lock-signal:signal_expiration') == 0
399
400
401
def test_reset_signalizes(make_conn, make_process):
402
    """Call to reset() causes LPUSH to signal key, so blocked waiters
403
    become unblocked."""
404
    def workerfn(unblocked):
405
        conn = make_conn()
406
        lock = Lock(conn, 'lock')
407
        if lock.acquire():
408
            unblocked.value = 1
409
410
    unblocked = multiprocessing.Value('B', 0)
411
    conn = make_conn()
412
    lock = Lock(conn, 'lock')
413
    lock.acquire()
414
415
    worker = make_process(target=workerfn, args=(unblocked,))
416
    worker.start()
417
    worker.join(0.5)
418
    lock.reset()
419
    worker.join(0.5)
420
421
    assert unblocked.value == 1
422
423
424
def test_reset_all_signalizes(make_conn, make_process):
425
    """Call to reset_all() causes LPUSH to all signal keys, so blocked waiters
426
    become unblocked."""
427
    def workerfn(unblocked):
428
        conn = make_conn()
429
        lock1 = Lock(conn, 'lock1')
430
        lock2 = Lock(conn, 'lock2')
431
        if lock1.acquire() and lock2.acquire():
432
            unblocked.value = 1
433
434
    unblocked = multiprocessing.Value('B', 0)
435
    conn = make_conn()
436
    lock1 = Lock(conn, 'lock1')
437
    lock2 = Lock(conn, 'lock2')
438
    lock1.acquire()
439
    lock2.acquire()
440
441
    worker = make_process(target=workerfn, args=(unblocked,))
442
    worker.start()
443
    worker.join(0.5)
444
    reset_all(conn)
445
    worker.join(0.5)
446
447
    assert unblocked.value == 1
448