Completed
Push — master ( d77c87...01fcff )
by Roy
01:09
created

pyspider.libs.MultiProcessingQueue._qsize()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
import multiprocessing
2
from multiprocessing.queues import Queue as MPQueue
3
from six.moves import queue as BaseQueue
4
5
6
Empty = BaseQueue.Empty
7
Full = BaseQueue.Full
8
9
10
# The SharedCounter and Queue classes come from:
11
# https://github.com/vterron/lemon/commit/9ca6b4b
12
13
class SharedCounter(object):
14
    """ A synchronized shared counter.
15
    The locking done by multiprocessing.Value ensures that only a single
16
    process or thread may read or write the in-memory ctypes object. However,
17
    in order to do n += 1, Python performs a read followed by a write, so a
18
    second process may read the old value before the new one is written by the
19
    first process. The solution is to use a multiprocessing.Lock to guarantee
20
    the atomicity of the modifications to Value.
21
    This class comes almost entirely from Eli Bendersky's blog:
22
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
23
    """
24
25
    def __init__(self, n=0):
26
        self.count = multiprocessing.Value('i', n)
27
28
    def increment(self, n=1):
29
        """ Increment the counter by n (default = 1) """
30
        with self.count.get_lock():
31
            self.count.value += n
32
33
    @property
34
    def value(self):
35
        """ Return the value of the counter """
36
        return self.count.value
37
38
39
class Queue(BaseQueue.Queue, object):
40
    """ A portable implementation of multiprocessing.Queue.
41
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
42
    raise the NotImplementedError exception on Unix platforms like Mac OS X
43
    where sem_getvalue() is not implemented. This subclass addresses this
44
    problem by using a synchronized shared counter (initialized to zero) and
45
    increasing / decreasing its value every time the put() and get() methods
46
    are called, respectively. This not only prevents NotImplementedError from
47
    being raised, but also allows us to implement a reliable version of both
48
    qsize() and empty().
49
    """
50
51
    def __init__(self, *args, **kwargs):
52
        super(Queue, self).__init__(*args, **kwargs)
53
        self.size = SharedCounter(0)
54
55
    def _put(self, *args, **kwargs):
56
        self.size.increment(1)
57
        super(Queue, self).put(*args, **kwargs)
58
59
    def _get(self, *args, **kwargs):
60
        v = super(Queue, self).get(*args, **kwargs)
61
        self.size.increment(-1)
62
        return v
63
64
    def _qsize(self):
65
        """ Reliable implementation of multiprocessing.Queue.qsize() """
66
        return self.size.value
67
68
69
class MultiProcessingQueue(MPQueue, object):
70
    def __init__(self, *args, **kwargs):
71
        super(MultiProcessingQueue, self).__init__(*args, **kwargs)
72
        self.size = SharedCounter(0)
73
74
    def _put(self, *args, **kwargs):
75
        self.size.increment(1)
76
        super(MultiProcessingQueue, self).put(*args, **kwargs)
77
78
    def _get(self, *args, **kwargs):
79
        v = super(MultiProcessingQueue, self).get(*args, **kwargs)
80
        self.size.increment(-1)
81
        return v
82
83
    def _qsize(self):
84
        """ Reliable implementation of multiprocessing.Queue.qsize() """
85
        return self.size.value
86
87
88
def get_multiprocessing_queue(maxsize=0):
89
    if hasattr(multiprocessing, 'get_context'):  # python 3.4
90
        return MultiProcessingQueue(maxsize,
91
                                    ctx=multiprocessing.get_context())
92
    else:
93
        return MultiProcessingQueue(maxsize=maxsize)
94