BeanstalkQueue.get()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 17
rs 8
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# coding:utf-8
3
"""beanstalk queue - queue based on beanstalk
4
5
6
Setting: you need to set max-job-size bigger(default 65535)
7
DAEMON_OPTS="-l $BEANSTALKD_LISTEN_ADDR -p $BEANSTALKD_LISTEN_PORT -z 524288"
8
"""
9
10
import time
11
import umsgpack
12
import beanstalkc
13
import threading
14
import logging
15
16
from six.moves import queue as BaseQueue
17
18
19
class BeanstalkQueue(object):
20
    max_timeout = 0.3
21
    Empty = BaseQueue.Empty
22
    Full = BaseQueue.Full
23
24
    def __init__(self, name, host='localhost:11300', maxsize=0):
25
        """
26
        Constructor for a BeanstalkdQueue.
27
        """
28
        self.name = name
29
30
        config = host.split(':')
31
        self.host = config[0] if len(config) else 'localhost'
32
        self.port = int(config[1]) if len(config) > 1 else 11300
33
        self.lock = threading.RLock()
34
        self.maxsize = maxsize
35
        self.reconnect()
36
37
    def stats(self):
38
        try:
39
            with self.lock:
40
                stats = self.connection.stats_tube(self.name)
41
        except beanstalkc.CommandFailed as err:
42
            # tube is empty
43
            if err[1] == 'NOT_FOUND':
44
                return {}
45
46
        stats = [item.split(': ') for item in stats.split('\n') if item.find(':')]
47
        stats = [(item[0], item[1]) for item in stats if len(item) == 2]
48
        return dict(stats)
49
50
    def reconnect(self):
51
        self.connection = beanstalkc.Connection(host=self.host, port=self.port, parse_yaml=False)
52
        self.connection.use(self.name)
53
        self.connection.watch(self.name)
54
55
    def qsize(self):
56
        stats = self.stats()
57
        return int(stats.get('current-jobs-ready', 0))
58
59
    def empty(self):
60
        if self.qsize() == 0:
61
            return True
62
        else:
63
            return False
64
65
    def full(self):
66
        if self.maxsize and self.qsize() >= self.maxsize:
67
            return True
68
        else:
69
            return False
70
71
    def put(self, obj, block=True, timeout=None):
72
        if not block:
73
            return self.put_nowait(obj)
74
75
        start_time = time.time()
76
        while True:
77
            try:
78
                return self.put_nowait(obj)
79
            except BaseQueue.Full:
80
                if timeout:
81
                    lasted = time.time() - start_time
82
                    if timeout > lasted:
83
                        time.sleep(min(self.max_timeout, timeout - lasted))
84
                    else:
85
                        raise
86
                else:
87
                    time.sleep(self.max_timeout)
88
89
    def put_nowait(self, obj):
90
        if self.full():
91
            raise BaseQueue.Full
92
93
        with self.lock:
94
            return self.connection.put(umsgpack.packb(obj))
95
96
    def get(self, block=True, timeout=None):
97
        if not block:
98
            return self.get_nowait()
99
100
        start_time = time.time()
101
        while True:
102
            try:
103
                return self.get_nowait()
104
            except BaseQueue.Empty:
105
                if timeout:
106
                    lasted = time.time() - start_time
107
                    if timeout > lasted:
108
                        time.sleep(min(self.max_timeout, timeout - lasted))
109
                    else:
110
                        raise
111
                else:
112
                    time.sleep(self.max_timeout)
113
114
    def get_nowait(self):
115
        try:
116
            with self.lock:
117
                job = self.connection.reserve(0)
118
                if not job:
119
                    raise BaseQueue.Empty
120
                else:
121
                    body = umsgpack.unpackb(job.body)
122
                    job.delete()
123
                    return body
124
        except beanstalkc.DeadlineSoon:
125
            raise BaseQueue.Empty
126
127
128
Queue = BeanstalkQueue
129