1
|
|
|
import six |
2
|
|
|
import platform |
3
|
|
|
import multiprocessing |
4
|
|
|
from multiprocessing.queues import Queue as BaseQueue |
5
|
|
|
|
6
|
|
|
|
7
|
|
|
# The SharedCounter and Queue classes come from: |
8
|
|
|
# https://github.com/vterron/lemon/commit/9ca6b4b |
9
|
|
|
|
10
|
|
|
class SharedCounter(object): |
11
|
|
|
""" A synchronized shared counter. |
12
|
|
|
The locking done by multiprocessing.Value ensures that only a single |
13
|
|
|
process or thread may read or write the in-memory ctypes object. However, |
14
|
|
|
in order to do n += 1, Python performs a read followed by a write, so a |
15
|
|
|
second process may read the old value before the new one is written by the |
16
|
|
|
first process. The solution is to use a multiprocessing.Lock to guarantee |
17
|
|
|
the atomicity of the modifications to Value. |
18
|
|
|
This class comes almost entirely from Eli Bendersky's blog: |
19
|
|
|
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ |
20
|
|
|
""" |
21
|
|
|
|
22
|
|
|
def __init__(self, n=0): |
23
|
|
|
self.count = multiprocessing.Value('i', n) |
24
|
|
|
|
25
|
|
|
def increment(self, n=1): |
26
|
|
|
""" Increment the counter by n (default = 1) """ |
27
|
|
|
with self.count.get_lock(): |
28
|
|
|
self.count.value += n |
29
|
|
|
|
30
|
|
|
@property |
31
|
|
|
def value(self): |
32
|
|
|
""" Return the value of the counter """ |
33
|
|
|
return self.count.value |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
class MultiProcessingQueue(BaseQueue): |
37
|
|
|
""" A portable implementation of multiprocessing.Queue. |
38
|
|
|
Because of multithreading / multiprocessing semantics, Queue.qsize() may |
39
|
|
|
raise the NotImplementedError exception on Unix platforms like Mac OS X |
40
|
|
|
where sem_getvalue() is not implemented. This subclass addresses this |
41
|
|
|
problem by using a synchronized shared counter (initialized to zero) and |
42
|
|
|
increasing / decreasing its value every time the put() and get() methods |
43
|
|
|
are called, respectively. This not only prevents NotImplementedError from |
44
|
|
|
being raised, but also allows us to implement a reliable version of both |
45
|
|
|
qsize() and empty(). |
46
|
|
|
""" |
47
|
|
|
def __init__(self, *args, **kwargs): |
48
|
|
|
super(MultiProcessingQueue, self).__init__(*args, **kwargs) |
49
|
|
|
self.size = SharedCounter(0) |
50
|
|
|
|
51
|
|
|
def put(self, *args, **kwargs): |
52
|
|
|
self.size.increment(1) |
53
|
|
|
super(MultiProcessingQueue, self).put(*args, **kwargs) |
54
|
|
|
|
55
|
|
|
def get(self, *args, **kwargs): |
56
|
|
|
v = super(MultiProcessingQueue, self).get(*args, **kwargs) |
57
|
|
|
self.size.increment(-1) |
58
|
|
|
return v |
59
|
|
|
|
60
|
|
|
def qsize(self): |
61
|
|
|
""" Reliable implementation of multiprocessing.Queue.qsize() """ |
62
|
|
|
return self.size.value |
63
|
|
|
|
64
|
|
|
|
65
|
|
|
if platform.system() == 'Darwin': |
66
|
|
|
if hasattr(multiprocessing, 'get_context'): # for py34 |
67
|
|
|
def Queue(maxsize=0): |
68
|
|
|
return MultiProcessingQueue(maxsize, ctx=multiprocessing.get_context()) |
69
|
|
|
else: |
70
|
|
|
def Queue(maxsize=0): |
71
|
|
|
return MultiProcessingQueue(maxsize) |
72
|
|
|
else: |
73
|
|
|
from MultiProcessingQueue import Queue # flake8: noqa |
74
|
|
|
|