test()   F
last analyzed

Complexity

Conditions 9

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 28
rs 3
1
import logging
2
logging.basicConfig(level="WARN", format="%(message)s")
3
4
import os
5
import sys
6
import time
7
import signal
8
from futures import ProcessPoolExecutor
9
from sched import scheduler
10
from redis import StrictRedis
11
from redis_lock import Lock
12
13
class Exit(Exception):
14
    pass
15
16
def bail(n, f):
17
    raise Exit()
18
19
signal.signal(signal.SIGALRM, bail)
20
21
def test((t, duration, type_)):
22
    conn = StrictRedis()
23
    conn.flushdb()
24
    ret = []
25
26
    def run():
27
        iterations = 0
28
        signal.setitimer(signal.ITIMER_REAL, int(sys.argv[1]))
29
        try:
30
            if type_ == 'redis_lock':
31
                while True:
32
                    with Lock(conn, "test-lock", expire=5):
33
                        iterations += 1
34
                        time.sleep(duration)
35
            elif type_ == 'native':
36
                while True:
37
                    with conn.lock("test-lock", timeout=5):
38
                        iterations += 1
39
                        time.sleep(duration)
40
        except:
41
            logging.info("Got %r. Returning ...", sys.exc_value)
42
        ret.append(iterations)
43
44
    sched = scheduler(time.time, time.sleep)
45
    logging.info("Running in %s seconds ...", t - time.time())
46
    sched.enterabs(t, 0, run, ())
47
    sched.run()
48
    return ret[0]
49
logging.critical("========== ======== =========== ======== ========== ===== =====")
50
logging.critical("Type       Duration Concurrency Sum      Avg        Min   Max")
51
logging.critical("========== ======== =========== ======== ========== ===== =====")
52
53
for type_ in (
54
    'redis_lock',
55
    'native',
56
):
57
    for duration in (
58
        0,
59
        0.001,
60
        0.01,
61
        0.05,
62
        0.1
63
    ):
64
        for concurrency in (
65
            1,
66
            2,
67
            3,
68
            4,
69
            5,
70
            6,
71
            12,
72
            24,
73
            48
74
        ):
75
            with ProcessPoolExecutor(max_workers=concurrency) as pool:
76
                t = round(time.time()) + 1
77
                load = [(t, duration, type_) for _ in range(concurrency)]
78
                logging.info("Running %s", load)
79
                ret = [i for i in pool.map(test, load)]
80
81
            logging.critical(
82
                "%10s %-8.3f %-11s %-8s %-10.2f %-5s %-5s",
83
                type_, duration, concurrency, sum(ret), sum(ret)/len(ret), min(ret), max(ret)
84
            )
85
logging.critical("========== ======== =========== ======== ========== ===== =====")
86