AmqpQueue   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 80
Duplicated Lines 36.25 %

Importance

Changes 0
Metric Value
dl 29
loc 80
rs 10
c 0
b 0
f 0
wmc 16

5 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 29 29 3
A reconnect() 0 14 2
A qsize() 0 6 2
A get_nowait() 0 9 4
B put_nowait() 0 12 5

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-11-15 17:27:54
7
8
import time
9
import socket
10
import select
11
import logging
12
import umsgpack
13
import threading
14
15
import amqp
16
from six.moves.urllib.parse import unquote
17
try:
18
    from urllib import parse as urlparse
19
except ImportError:
20
    import urlparse
21
from six.moves import queue as BaseQueue
22
23
24
def catch_error(func):
25
    """Catch errors of rabbitmq then reconnect"""
26
    import amqp
27
    try:
28
        import pika.exceptions
29
        connect_exceptions = (
30
            pika.exceptions.ConnectionClosed,
31
            pika.exceptions.AMQPConnectionError,
32
        )
33
    except ImportError:
34
        connect_exceptions = ()
35
36
    connect_exceptions += (
37
        select.error,
38
        socket.error,
39
        amqp.ConnectionError
40
    )
41
42
    def wrap(self, *args, **kwargs):
43
        try:
44
            return func(self, *args, **kwargs)
45
        except connect_exceptions as e:
46
            logging.error('RabbitMQ error: %r, reconnect.', e)
47
            self.reconnect()
48
            return func(self, *args, **kwargs)
49
    return wrap
50
51
52
class PikaQueue(object):
53
    """
54
    A Queue like rabbitmq connector
55
    """
56
57
    Empty = BaseQueue.Empty
58
    Full = BaseQueue.Full
59
    max_timeout = 0.3
60
61 View Code Duplication
    def __init__(self, name, amqp_url='amqp://guest:guest@localhost:5672/%2F',
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
62
                 maxsize=0, lazy_limit=True):
63
        """
64
        Constructor for a PikaQueue.
65
66
        Not works with python 3. Default for python 2.
67
68
        amqp_url:   https://www.rabbitmq.com/uri-spec.html
69
        maxsize:    an integer that sets the upperbound limit on the number of
70
                    items that can be placed in the queue.
71
        lazy_limit: as rabbitmq is shared between multipul instance, for a strict
72
                    limit on the number of items in the queue. PikaQueue have to
73
                    update current queue size before every put operation. When
74
                    `lazy_limit` is enabled, PikaQueue will check queue size every
75
                    max_size / 10 put operation for better performace.
76
        """
77
        self.name = name
78
        self.amqp_url = amqp_url
79
        self.maxsize = maxsize
80
        self.lock = threading.RLock()
81
82
        self.lazy_limit = lazy_limit
83
        if self.lazy_limit and self.maxsize:
84
            self.qsize_diff_limit = int(self.maxsize * 0.1)
85
        else:
86
            self.qsize_diff_limit = 0
87
        self.qsize_diff = 0
88
89
        self.reconnect()
90
91
    def reconnect(self):
92
        """Reconnect to rabbitmq server"""
93
        import pika
94
        import pika.exceptions
95
96
        self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
97
        self.channel = self.connection.channel()
98
        try:
99
            self.channel.queue_declare(self.name)
100
        except pika.exceptions.ChannelClosed:
101
            self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
102
            self.channel = self.connection.channel()
103
        #self.channel.queue_purge(self.name)
104
105
    @catch_error
106
    def qsize(self):
107
        with self.lock:
108
            ret = self.channel.queue_declare(self.name, passive=True)
109
        return ret.method.message_count
110
111
    def empty(self):
112
        if self.qsize() == 0:
113
            return True
114
        else:
115
            return False
116
117
    def full(self):
118
        if self.maxsize and self.qsize() >= self.maxsize:
119
            return True
120
        else:
121
            return False
122
123
    @catch_error
124
    def put(self, obj, block=True, timeout=None):
125
        if not block:
126
            return self.put_nowait()
127
128
        start_time = time.time()
129
        while True:
130
            try:
131
                return self.put_nowait(obj)
132
            except BaseQueue.Full:
133
                if timeout:
134
                    lasted = time.time() - start_time
135
                    if timeout > lasted:
136
                        time.sleep(min(self.max_timeout, timeout - lasted))
137
                    else:
138
                        raise
139
                else:
140
                    time.sleep(self.max_timeout)
141
142
    @catch_error
143
    def put_nowait(self, obj):
144
        if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit:
145
            pass
146
        elif self.full():
147
            raise BaseQueue.Full
148
        else:
149
            self.qsize_diff = 0
150
        with self.lock:
151
            self.qsize_diff += 1
152
            return self.channel.basic_publish("", self.name, umsgpack.packb(obj))
153
154
    @catch_error
155
    def get(self, block=True, timeout=None, ack=False):
156
        if not block:
157
            return self.get_nowait()
158
159
        start_time = time.time()
160
        while True:
161
            try:
162
                return self.get_nowait(ack)
163
            except BaseQueue.Empty:
164
                if timeout:
165
                    lasted = time.time() - start_time
166
                    if timeout > lasted:
167
                        time.sleep(min(self.max_timeout, timeout - lasted))
168
                    else:
169
                        raise
170
                else:
171
                    time.sleep(self.max_timeout)
172
173
    @catch_error
174
    def get_nowait(self, ack=False):
175
        with self.lock:
176
            method_frame, header_frame, body = self.channel.basic_get(self.name, not ack)
177
            if method_frame is None:
178
                raise BaseQueue.Empty
179
            if ack:
180
                self.channel.basic_ack(method_frame.delivery_tag)
181
        return umsgpack.unpackb(body)
182
183
    @catch_error
184
    def delete(self):
185
        with self.lock:
186
            return self.channel.queue_delete(queue=self.name)
187
188
189
class AmqpQueue(PikaQueue):
190
    Empty = BaseQueue.Empty
191
    Full = BaseQueue.Full
192
    max_timeout = 0.3
193
194 View Code Duplication
    def __init__(self, name, amqp_url='amqp://guest:guest@localhost:5672/%2F',
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
195
                 maxsize=0, lazy_limit=True):
196
        """
197
        Constructor for a AmqpQueue.
198
199
        Default for python 3.
200
201
        amqp_url:   https://www.rabbitmq.com/uri-spec.html
202
        maxsize:    an integer that sets the upperbound limit on the number of
203
                    items that can be placed in the queue.
204
        lazy_limit: as rabbitmq is shared between multipul instance, for a strict
205
                    limit on the number of items in the queue. PikaQueue have to
206
                    update current queue size before every put operation. When
207
                    `lazy_limit` is enabled, PikaQueue will check queue size every
208
                    max_size / 10 put operation for better performace.
209
        """
210
        self.name = name
211
        self.amqp_url = amqp_url
212
        self.maxsize = maxsize
213
        self.lock = threading.RLock()
214
215
        self.lazy_limit = lazy_limit
216
        if self.lazy_limit and self.maxsize:
217
            self.qsize_diff_limit = int(self.maxsize * 0.1)
218
        else:
219
            self.qsize_diff_limit = 0
220
        self.qsize_diff = 0
221
222
        self.reconnect()
223
224
    def reconnect(self):
225
        """Reconnect to rabbitmq server"""
226
        parsed = urlparse.urlparse(self.amqp_url)
227
        port = parsed.port or 5672
228
        self.connection = amqp.Connection(host="%s:%s" % (parsed.hostname, port),
229
                                          userid=parsed.username or 'guest',
230
                                          password=parsed.password or 'guest',
231
                                          virtual_host=unquote(
232
                                              parsed.path.lstrip('/') or '%2F'))
233
        self.channel = self.connection.channel()
234
        try:
235
            self.channel.queue_declare(self.name)
236
        except amqp.exceptions.PreconditionFailed:
237
            pass
238
        #self.channel.queue_purge(self.name)
239
240
    @catch_error
241
    def qsize(self):
242
        with self.lock:
243
            name, message_count, consumer_count = self.channel.queue_declare(
244
                self.name, passive=True)
245
        return message_count
246
247
    @catch_error
248
    def put_nowait(self, obj):
249
        if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit:
250
            pass
251
        elif self.full():
252
            raise BaseQueue.Full
253
        else:
254
            self.qsize_diff = 0
255
        with self.lock:
256
            self.qsize_diff += 1
257
            msg = amqp.Message(umsgpack.packb(obj))
258
            return self.channel.basic_publish(msg, exchange="", routing_key=self.name)
259
260
    @catch_error
261
    def get_nowait(self, ack=False):
262
        with self.lock:
263
            message = self.channel.basic_get(self.name, not ack)
264
            if message is None:
265
                raise BaseQueue.Empty
266
            if ack:
267
                self.channel.basic_ack(message.delivery_tag)
268
        return umsgpack.unpackb(message.body)
269
270
Queue = AmqpQueue
271