|
1
|
|
|
import multiprocessing |
|
2
|
|
|
from multiprocessing.queues import Queue as MPQueue |
|
3
|
|
|
from six.moves import queue as BaseQueue |
|
4
|
|
|
|
|
5
|
|
|
|
|
6
|
|
|
Empty = BaseQueue.Empty |
|
7
|
|
|
Full = BaseQueue.Full |
|
8
|
|
|
|
|
9
|
|
|
|
|
10
|
|
|
# The SharedCounter and Queue classes come from: |
|
11
|
|
|
# https://github.com/vterron/lemon/commit/9ca6b4b |
|
12
|
|
|
|
|
13
|
|
|
class SharedCounter(object): |
|
14
|
|
|
""" A synchronized shared counter. |
|
15
|
|
|
The locking done by multiprocessing.Value ensures that only a single |
|
16
|
|
|
process or thread may read or write the in-memory ctypes object. However, |
|
17
|
|
|
in order to do n += 1, Python performs a read followed by a write, so a |
|
18
|
|
|
second process may read the old value before the new one is written by the |
|
19
|
|
|
first process. The solution is to use a multiprocessing.Lock to guarantee |
|
20
|
|
|
the atomicity of the modifications to Value. |
|
21
|
|
|
This class comes almost entirely from Eli Bendersky's blog: |
|
22
|
|
|
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ |
|
23
|
|
|
""" |
|
24
|
|
|
|
|
25
|
|
|
def __init__(self, n=0): |
|
26
|
|
|
self.count = multiprocessing.Value('i', n) |
|
27
|
|
|
|
|
28
|
|
|
def increment(self, n=1): |
|
29
|
|
|
""" Increment the counter by n (default = 1) """ |
|
30
|
|
|
with self.count.get_lock(): |
|
31
|
|
|
self.count.value += n |
|
32
|
|
|
|
|
33
|
|
|
@property |
|
34
|
|
|
def value(self): |
|
35
|
|
|
""" Return the value of the counter """ |
|
36
|
|
|
return self.count.value |
|
37
|
|
|
|
|
38
|
|
|
|
|
39
|
|
|
class Queue(BaseQueue.Queue, object): |
|
40
|
|
|
""" A portable implementation of multiprocessing.Queue. |
|
41
|
|
|
Because of multithreading / multiprocessing semantics, Queue.qsize() may |
|
42
|
|
|
raise the NotImplementedError exception on Unix platforms like Mac OS X |
|
43
|
|
|
where sem_getvalue() is not implemented. This subclass addresses this |
|
44
|
|
|
problem by using a synchronized shared counter (initialized to zero) and |
|
45
|
|
|
increasing / decreasing its value every time the put() and get() methods |
|
46
|
|
|
are called, respectively. This not only prevents NotImplementedError from |
|
47
|
|
|
being raised, but also allows us to implement a reliable version of both |
|
48
|
|
|
qsize() and empty(). |
|
49
|
|
|
""" |
|
50
|
|
|
|
|
51
|
|
|
def __init__(self, *args, **kwargs): |
|
52
|
|
|
super(Queue, self).__init__(*args, **kwargs) |
|
53
|
|
|
self.size = SharedCounter(0) |
|
54
|
|
|
|
|
55
|
|
|
def _put(self, *args, **kwargs): |
|
56
|
|
|
self.size.increment(1) |
|
57
|
|
|
super(Queue, self)._put(*args, **kwargs) |
|
58
|
|
|
|
|
59
|
|
|
def _get(self, *args, **kwargs): |
|
60
|
|
|
v = super(Queue, self)._get(*args, **kwargs) |
|
61
|
|
|
self.size.increment(-1) |
|
62
|
|
|
return v |
|
63
|
|
|
|
|
64
|
|
|
def _qsize(self): |
|
65
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
|
66
|
|
|
return self.size.value |
|
67
|
|
|
|
|
68
|
|
|
|
|
69
|
|
|
class MultiProcessingQueue(MPQueue, object): |
|
70
|
|
|
def __init__(self, *args, **kwargs): |
|
71
|
|
|
super(MultiProcessingQueue, self).__init__(*args, **kwargs) |
|
72
|
|
|
self.size = SharedCounter(0) |
|
73
|
|
|
|
|
74
|
|
|
def _put(self, *args, **kwargs): |
|
75
|
|
|
self.size.increment(1) |
|
76
|
|
|
super(MultiProcessingQueue, self)._put(*args, **kwargs) |
|
77
|
|
|
|
|
78
|
|
|
def _get(self, *args, **kwargs): |
|
79
|
|
|
v = super(MultiProcessingQueue, self)._get(*args, **kwargs) |
|
80
|
|
|
self.size.increment(-1) |
|
81
|
|
|
return v |
|
82
|
|
|
|
|
83
|
|
|
def _qsize(self): |
|
84
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
|
85
|
|
|
return self.size.value |
|
86
|
|
|
|
|
87
|
|
|
def qsize(self): |
|
88
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
|
89
|
|
|
return self.size.value |
|
90
|
|
|
|
|
91
|
|
|
|
|
92
|
|
|
def get_multiprocessing_queue(maxsize=0): |
|
93
|
|
|
if hasattr(multiprocessing, 'get_context'): # python 3.4 |
|
94
|
|
|
return MultiProcessingQueue(maxsize, |
|
95
|
|
|
ctx=multiprocessing.get_context()) |
|
96
|
|
|
else: |
|
97
|
|
|
return MultiProcessingQueue(maxsize=maxsize) |
|
98
|
|
|
|