Completed
Push — master ( 22cef5...ed871f )
by Roy
01:09
created

pyspider.libs.MultiProcessingQueue   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 27
Duplicated Lines 0 %
Metric Value
dl 0
loc 27
rs 10
wmc 4

4 Methods

Rating   Name   Duplication   Size   Complexity  
A put() 0 3 1
A qsize() 0 3 1
A __init__() 0 3 1
A get() 0 4 1
1
import six
2
import platform
3
import multiprocessing
4
from multiprocessing.queues import Queue as BaseQueue
5
6
7
# The SharedCounter and Queue classes come from:
8
# https://github.com/vterron/lemon/commit/9ca6b4b
9
10
class SharedCounter(object):
11
    """ A synchronized shared counter.
12
    The locking done by multiprocessing.Value ensures that only a single
13
    process or thread may read or write the in-memory ctypes object. However,
14
    in order to do n += 1, Python performs a read followed by a write, so a
15
    second process may read the old value before the new one is written by the
16
    first process. The solution is to use a multiprocessing.Lock to guarantee
17
    the atomicity of the modifications to Value.
18
    This class comes almost entirely from Eli Bendersky's blog:
19
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
20
    """
21
22
    def __init__(self, n=0):
23
        self.count = multiprocessing.Value('i', n)
24
25
    def increment(self, n=1):
26
        """ Increment the counter by n (default = 1) """
27
        with self.count.get_lock():
28
            self.count.value += n
29
30
    @property
31
    def value(self):
32
        """ Return the value of the counter """
33
        return self.count.value
34
35
36
class MultiProcessingQueue(BaseQueue):
37
    """ A portable implementation of multiprocessing.Queue.
38
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
39
    raise the NotImplementedError exception on Unix platforms like Mac OS X
40
    where sem_getvalue() is not implemented. This subclass addresses this
41
    problem by using a synchronized shared counter (initialized to zero) and
42
    increasing / decreasing its value every time the put() and get() methods
43
    are called, respectively. This not only prevents NotImplementedError from
44
    being raised, but also allows us to implement a reliable version of both
45
    qsize() and empty().
46
    """
47
    def __init__(self, *args, **kwargs):
48
        super(MultiProcessingQueue, self).__init__(*args, **kwargs)
49
        self.size = SharedCounter(0)
50
51
    def put(self, *args, **kwargs):
52
        self.size.increment(1)
53
        super(MultiProcessingQueue, self).put(*args, **kwargs)
54
55
    def get(self, *args, **kwargs):
56
        v = super(MultiProcessingQueue, self).get(*args, **kwargs)
57
        self.size.increment(-1)
58
        return v
59
60
    def qsize(self):
61
        """ Reliable implementation of multiprocessing.Queue.qsize() """
62
        return self.size.value
63
64
65
if platform.system() == 'Darwin':
66
    if hasattr(multiprocessing, 'get_context'):  # for py34
67
        def Queue(maxsize=0):
68
            return MultiProcessingQueue(maxsize, ctx=multiprocessing.get_context())
69
    else:
70
        def Queue(maxsize=0):
71
            return MultiProcessingQueue(maxsize)
72
else:
73
    from MultiProcessingQueue import Queue  # flake8: noqa
74