1
|
|
|
import multiprocessing |
2
|
|
|
from multiprocessing.queues import Queue as MPQueue |
3
|
|
|
from six.moves import queue as BaseQueue |
4
|
|
|
|
5
|
|
|
|
6
|
|
|
Empty = BaseQueue.Empty |
7
|
|
|
Full = BaseQueue.Full |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
# The SharedCounter and Queue classes come from: |
11
|
|
|
# https://github.com/vterron/lemon/commit/9ca6b4b |
12
|
|
|
|
13
|
|
|
class SharedCounter(object): |
14
|
|
|
""" A synchronized shared counter. |
15
|
|
|
The locking done by multiprocessing.Value ensures that only a single |
16
|
|
|
process or thread may read or write the in-memory ctypes object. However, |
17
|
|
|
in order to do n += 1, Python performs a read followed by a write, so a |
18
|
|
|
second process may read the old value before the new one is written by the |
19
|
|
|
first process. The solution is to use a multiprocessing.Lock to guarantee |
20
|
|
|
the atomicity of the modifications to Value. |
21
|
|
|
This class comes almost entirely from Eli Bendersky's blog: |
22
|
|
|
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ |
23
|
|
|
""" |
24
|
|
|
|
25
|
|
|
def __init__(self, n=0): |
26
|
|
|
self.count = multiprocessing.Value('i', n) |
27
|
|
|
|
28
|
|
|
def increment(self, n=1): |
29
|
|
|
""" Increment the counter by n (default = 1) """ |
30
|
|
|
with self.count.get_lock(): |
31
|
|
|
self.count.value += n |
32
|
|
|
|
33
|
|
|
@property |
34
|
|
|
def value(self): |
35
|
|
|
""" Return the value of the counter """ |
36
|
|
|
return self.count.value |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
class Queue(BaseQueue.Queue, object): |
40
|
|
|
""" A portable implementation of multiprocessing.Queue. |
41
|
|
|
Because of multithreading / multiprocessing semantics, Queue.qsize() may |
42
|
|
|
raise the NotImplementedError exception on Unix platforms like Mac OS X |
43
|
|
|
where sem_getvalue() is not implemented. This subclass addresses this |
44
|
|
|
problem by using a synchronized shared counter (initialized to zero) and |
45
|
|
|
increasing / decreasing its value every time the put() and get() methods |
46
|
|
|
are called, respectively. This not only prevents NotImplementedError from |
47
|
|
|
being raised, but also allows us to implement a reliable version of both |
48
|
|
|
qsize() and empty(). |
49
|
|
|
""" |
50
|
|
|
|
51
|
|
|
def __init__(self, *args, **kwargs): |
52
|
|
|
super(Queue, self).__init__(*args, **kwargs) |
53
|
|
|
self.size = SharedCounter(0) |
54
|
|
|
|
55
|
|
|
def _put(self, *args, **kwargs): |
56
|
|
|
self.size.increment(1) |
57
|
|
|
super(Queue, self).put(*args, **kwargs) |
58
|
|
|
|
59
|
|
|
def _get(self, *args, **kwargs): |
60
|
|
|
v = super(Queue, self).get(*args, **kwargs) |
61
|
|
|
self.size.increment(-1) |
62
|
|
|
return v |
63
|
|
|
|
64
|
|
|
def _qsize(self): |
65
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
66
|
|
|
return self.size.value |
67
|
|
|
|
68
|
|
|
|
69
|
|
|
class MultiProcessingQueue(MPQueue, object): |
70
|
|
|
def __init__(self, *args, **kwargs): |
71
|
|
|
super(MultiProcessingQueue, self).__init__(*args, **kwargs) |
72
|
|
|
self.size = SharedCounter(0) |
73
|
|
|
|
74
|
|
|
def _put(self, *args, **kwargs): |
75
|
|
|
self.size.increment(1) |
76
|
|
|
super(MultiProcessingQueue, self).put(*args, **kwargs) |
77
|
|
|
|
78
|
|
|
def _get(self, *args, **kwargs): |
79
|
|
|
v = super(MultiProcessingQueue, self).get(*args, **kwargs) |
80
|
|
|
self.size.increment(-1) |
81
|
|
|
return v |
82
|
|
|
|
83
|
|
|
def _qsize(self): |
84
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
85
|
|
|
return self.size.value |
86
|
|
|
|
87
|
|
|
|
88
|
|
|
def get_multiprocessing_queue(maxsize=0): |
89
|
|
|
if hasattr(multiprocessing, 'get_context'): # python 3.4 |
90
|
|
|
return MultiProcessingQueue(maxsize, |
91
|
|
|
ctx=multiprocessing.get_context()) |
92
|
|
|
else: |
93
|
|
|
return MultiProcessingQueue(maxsize=maxsize) |
94
|
|
|
|