RedisQueue.__init__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 19
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2015-04-27 22:48:04
7
8
import time
9
import redis
10
import umsgpack
11
from six.moves import queue as BaseQueue
12
13
14
class RedisQueue(object):
15
    """
16
    A Queue like message built over redis
17
    """
18
19
    Empty = BaseQueue.Empty
20
    Full = BaseQueue.Full
21
    max_timeout = 0.3
22
23
    def __init__(self, name, host='localhost', port=6379, db=0,
24
                 maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):
25
        """
26
        Constructor for RedisQueue
27
28
        maxsize:    an integer that sets the upperbound limit on the number of
29
                    items that can be placed in the queue.
30
        lazy_limit: redis queue is shared via instance, a lazy size limit is used
31
                    for better performance.
32
        """
33
        self.name = name
34
        if(cluster_nodes is not None):
35
            from rediscluster import StrictRedisCluster
36
            self.redis = StrictRedisCluster(startup_nodes=cluster_nodes)
37
        else:
38
            self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
39
        self.maxsize = maxsize
40
        self.lazy_limit = lazy_limit
41
        self.last_qsize = 0
42
43
    def qsize(self):
44
        self.last_qsize = self.redis.llen(self.name)
45
        return self.last_qsize
46
47
    def empty(self):
48
        if self.qsize() == 0:
49
            return True
50
        else:
51
            return False
52
53
    def full(self):
54
        if self.maxsize and self.qsize() >= self.maxsize:
55
            return True
56
        else:
57
            return False
58
59
    def put_nowait(self, obj):
60
        if self.lazy_limit and self.last_qsize < self.maxsize:
61
            pass
62
        elif self.full():
63
            raise self.Full
64
        self.last_qsize = self.redis.rpush(self.name, umsgpack.packb(obj))
65
        return True
66
67
    def put(self, obj, block=True, timeout=None):
68
        if not block:
69
            return self.put_nowait(obj)
70
71
        start_time = time.time()
72
        while True:
73
            try:
74
                return self.put_nowait(obj)
75
            except self.Full:
76
                if timeout:
77
                    lasted = time.time() - start_time
78
                    if timeout > lasted:
79
                        time.sleep(min(self.max_timeout, timeout - lasted))
80
                    else:
81
                        raise
82
                else:
83
                    time.sleep(self.max_timeout)
84
85
    def get_nowait(self):
86
        ret = self.redis.lpop(self.name)
87
        if ret is None:
88
            raise self.Empty
89
        return umsgpack.unpackb(ret)
90
91
    def get(self, block=True, timeout=None):
92
        if not block:
93
            return self.get_nowait()
94
95
        start_time = time.time()
96
        while True:
97
            try:
98
                return self.get_nowait()
99
            except self.Empty:
100
                if timeout:
101
                    lasted = time.time() - start_time
102
                    if timeout > lasted:
103
                        time.sleep(min(self.max_timeout, timeout - lasted))
104
                    else:
105
                        raise
106
                else:
107
                    time.sleep(self.max_timeout)
108
109
Queue = RedisQueue
110