Completed
Push — file-actions ( 3503b9...9b9199 )
by Felipe A.
02:57
created

BlockingPipe   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 81
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 13
dl 0
loc 81
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 5 1
A write() 0 17 3
B close() 0 20 5
A retrieve() 0 17 4
1
2
import os
3
import os.path
4
import tarfile
5
import threading
6
import functools
7
8
import browsepy.compat as compat
9
10
11
class BlockingPipeAbort(RuntimeError):
12
    '''
13
    Exception used internally be default's :class:`BlockingPipe`
14
    implementation.
15
    '''
16
    pass
17
18
class BlockingPipe(object):
19
    '''
20
    Minimal pipe class with `write`, `retrieve` and `close` blocking methods.
21
22
    This class implementation assumes that :attr:`pipe_class` (set as
23
    class:`queue.Queue` in current implementation) instances has both `put`
24
    and `get blocking methods.
25
26
    Due its blocking implementation, this class is only compatible with
27
    python's threading module, any other approach (like coroutines) will
28
    require to adapt this class (via inheritance or implementation).
29
30
    This class exposes :method:`write` for :class:`tarfile.TarFile`
31
    `fileobj` compatibility.
32
    '''''
33
    lock_class = threading.Lock
34
    pipe_class = functools.partial(compat.Queue, maxsize=1)
35
    abort_exception = BlockingPipeAbort
36
37
    def __init__(self):
38
        self._pipe = self.pipe_class()
39
        self._wlock = self.lock_class()
40
        self._rlock = self.lock_class()
41
        self.closed = False
42
43
    def write(self, data):
44
        '''
45
        Put chunk of data onto pipe.
46
        This method blocks if pipe is already full.
47
48
        :param data: bytes to write to pipe
49
        :type data: bytes
50
        :returns: number of bytes written
51
        :rtype: int
52
        :raises WriteAbort: if already closed or closed while blocking
53
        '''
54
55
        with self._wlock:
56
            if self.closed:
57
                raise self.abort_exception()
58
            self._pipe.put(data)
59
            return len(data)
60
61
    def retrieve(self):
62
        '''
63
        Get chunk of data from pipe.
64
        This method blocks if pipe is empty.
65
66
        :returns: data chunk
67
        :rtype: bytes
68
        :raises WriteAbort: if already closed or closed while blocking
69
        '''
70
71
        with self._rlock:
72
            if self.closed:
73
                raise self.abort_exception()
74
            data = self._pipe.get()
75
            if data is None:
76
                raise self.abort_exception()
77
            return data
78
79
    def close(self):
80
        '''
81
        Closes, so any blocked and future writes or retrieves will raise
82
        :attr:`abort_exception` instances.
83
        '''
84
        self.closed = True
85
86
        # release locks
87
        reading = not self._rlock.acquire(blocking=False)
88
        writing = not self._wlock.acquire(blocking=False)
89
90
        if not reading:
91
            if writing:
92
                self._pipe.get()
93
            self._rlock.release()
94
95
        if not writing:
96
            if reading:
97
                self._pipe.put(None)
98
            self._wlock.release()
99
100
101
class TarFileStream(object):
102
    '''
103
    Tarfile which compresses while reading for streaming.
104
105
    Buffsize can be provided, it should be 512 multiple (the tar block size)
106
    for and will be used as tarfile block size.
107
108
    Note on corroutines: this class uses threading by default, but
109
    corroutine-based applications can change this behavior overriding the
110
    :attr:`event_class` and :attr:`thread_class` values.
111
    '''
112
113
    pipe_class = BlockingPipe
114
    abort_exception = BlockingPipe.abort_exception
115
    thread_class = threading.Thread
116
    tarfile_class = tarfile.open
117
118
    extensions = {
119
        'gz': 'tgz',
120
        'bz2': 'tar.bz2',
121
        'xz': 'tar.xz',
122
        }
123
124
    @property
125
    def name(self):
126
        return '%s.%s' % (
127
            os.path.basename(self.path),
128
            self.extensions.get(self._compress, 'tar')
129
            )
130
131
    def __init__(self, path, buffsize=10240, exclude=None, compress='gz'):
132
        '''
133
        Compression will start a thread, and will be pausing until consumed.
134
135
        Note that compress parameter will be ignored if buffsize is below 16.
136
137
        :param path: local path of directory whose content will be compressed.
138
        :type path: str
139
        :param buffsize: byte size of tarfile blocks, defaults to 10KiB
140
        :type buffsize: int
141
        :param exclude: absolute path filtering function, defaults to None
142
        :type exclude: callable
143
        :param compress: compression method ('gz', 'bz2', 'xz' or None)
144
        :type compress: str or None
145
        '''
146
        self.path = path
147
        self.exclude = exclude
148
149
        self._started = False
150
        self._buffsize = buffsize
151
        self._compress = compress if compress and buffsize > 15 else ''
152
        self._writable = self.pipe_class()
153
        self._th = self.thread_class(target=self._fill)
154
155
    def _fill(self):
156
        '''
157
        Writes data on internal tarfile instance, which writes to current
158
        object, using :meth:`write`.
159
160
        As this method is blocking, it is used inside a thread.
161
162
        This method is called automatically, on a thread, on initialization,
163
        so there is little need to call it manually.
164
        '''
165
        exclude = self.exclude
166
        path_join = os.path.join
167
        path = self.path
168
169
        def infofilter(info):
170
            return None if exclude(path_join(path, info.name)) else info
171
172
        tarfile = self.tarfile_class(
173
            fileobj=self._writable,
174
            mode='w|{}'.format(self._compress),
175
            bufsize=self._buffsize
176
            )
177
178
        try:
179
            tarfile.add(self.path, "", filter=infofilter if exclude else None)
180
            tarfile.close()  # force stream flush
181
        except self.abort_exception:
182
            # expected exception when pipe is closed prematurely
183
            tarfile.close()  # free fd
184
        else:
185
            self.close()
186
187
    def __next__(self):
188
        '''
189
        Pulls chunk from tarfile (which is processed on its own thread).
190
191
        :param want: number bytes to read, defaults to 0 (all available)
192
        :type want: int
193
        :returns: tarfile data as bytes
194
        :rtype: bytes
195
        '''
196
        if not self._started:
197
            self._started = True
198
            self._th.start()
199
200
        try:
201
            return self._writable.retrieve()
202
        except self.abort_exception:
203
            raise StopIteration()
204
205
    def __iter__(self):
206
        '''
207
        This class itself implements iterable protocol, so iter() returns
208
        this instance itself.
209
210
        :returns: instance itself
211
        :rtype: TarFileStream
212
        '''
213
        return self
214
215
    def close(self):
216
        '''
217
        Finish processing aborting any pending write.
218
        '''
219
        self._writable.close()
220