Completed
Push — file-actions ( fb4fef...9d04f1 )
by Felipe A.
01:06 queued 11s
created

TarFileStream.__init__()   B

Complexity

Conditions 2

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 9
Bugs 0 Features 3
Metric Value
c 9
b 0
f 3
dl 0
loc 25
rs 8.8571
cc 2
1
2
import os
3
import os.path
4
import tarfile
5
import threading
6
import functools
7
8
from . import compat
9
10
11
class BlockingPipeAbort(RuntimeError):
12
    '''
13
    Exception used internally by :class:`BlockingPipe`'s default
14
    implementation.
15
    '''
16
    pass
17
18
19
class BlockingPipe(object):
20
    '''
21
    Minimal pipe class with `write`, `retrieve` and `close` blocking methods.
22
23
    This class implementation assumes that :attr:`pipe_class` (set as
24
    class:`queue.Queue` in current implementation) instances has both `put`
25
    and `get blocking methods.
26
27
    Due its blocking implementation, this class uses :module:`threading`.
28
29
    This class exposes :method:`write` for :class:`tarfile.TarFile`
30
    `fileobj` compatibility.
31
    '''''
32
    lock_class = threading.Lock
33
    pipe_class = functools.partial(compat.Queue, maxsize=1)
34
    abort_exception = BlockingPipeAbort
35
36
    def __init__(self):
37
        self._pipe = self.pipe_class()
38
        self._wlock = self.lock_class()
39
        self._rlock = self.lock_class()
40
        self.closed = False
41
42
    def write(self, data):
43
        '''
44
        Put chunk of data onto pipe.
45
        This method blocks if pipe is already full.
46
47
        :param data: bytes to write to pipe
48
        :type data: bytes
49
        :returns: number of bytes written
50
        :rtype: int
51
        :raises WriteAbort: if already closed or closed while blocking
52
        '''
53
54
        with self._wlock:
55
            if self.closed:
56
                raise self.abort_exception()
57
            self._pipe.put(data)
58
            return len(data)
59
60
    def retrieve(self):
61
        '''
62
        Get chunk of data from pipe.
63
        This method blocks if pipe is empty.
64
65
        :returns: data chunk
66
        :rtype: bytes
67
        :raises WriteAbort: if already closed or closed while blocking
68
        '''
69
70
        with self._rlock:
71
            if self.closed:
72
                raise self.abort_exception()
73
            data = self._pipe.get()
74
            if data is None:
75
                raise self.abort_exception()
76
            return data
77
78
    def __del__(self):
79
        '''
80
        Call :method:`BlockingPipe.close`.
81
        '''
82
        self.close()
83
84
    def close(self):
85
        '''
86
        Closes, so any blocked and future writes or retrieves will raise
87
        :attr:`abort_exception` instances.
88
        '''
89
        if not self.closed:
90
            self.closed = True
91
92
            # release locks
93
            reading = not self._rlock.acquire(False)
94
            writing = not self._wlock.acquire(False)
95
96
            if not reading:
97
                if writing:
98
                    self._pipe.get()
99
                self._rlock.release()
100
101
            if not writing:
102
                if reading:
103
                    self._pipe.put(None)
104
                self._wlock.release()
105
106
107
class TarFileStream(compat.Iterator):
108
    '''
109
    Iterator class which yields tarfile chunks for streaming.
110
111
    This class implements :class:`collections.abc.Iterator` interface
112
    with :method:`close`, so it can be appropriately handled by wsgi servers
113
    (`PEP 333<https://www.python.org/dev/peps/pep-0333>`_).
114
115
    Buffsize can be provided, it should be 512 multiple (the tar block size)
116
    for and will be used as tarfile block size.
117
118
    This class uses :module:`threading` for offloading.
119
    '''
120
121
    pipe_class = BlockingPipe
122
    abort_exception = BlockingPipe.abort_exception
123
    thread_class = threading.Thread
124
    tarfile_class = tarfile.open
125
126
    mimetype = 'application/x-tar'
127
    compresion_modes = {
128
        None: ('', 'tar'),
129
        'gzip': ('gz', 'tgz'),
130
        'bzip2': ('bz2', 'tar.bz2'),
131
        'xz': ('xz', 'tar.xz'),
132
        }
133
134
    @property
135
    def name(self):
136
        '''
137
        Filename generated from given path and compression method.
138
        '''
139
        return '%s.%s' % (os.path.basename(self.path), self._extension)
140
141
    @property
142
    def encoding(self):
143
        '''
144
        Mimetype parameters (such as encoding).
145
        '''
146
        return self._compress
147
148
    def __init__(self, path, buffsize=10240, exclude=None, compress='gzip'):
149
        '''
150
        Initialize thread and class (thread is not started until interated.)
151
        Note that compress parameter will be ignored if buffsize is below 16.
152
153
        :param path: local path of directory whose content will be compressed.
154
        :type path: str
155
        :param buffsize: byte size of tarfile blocks, defaults to 10KiB
156
        :type buffsize: int
157
        :param exclude: absolute path filtering function, defaults to None
158
        :type exclude: callable
159
        :param compress: compression method ('gz', 'bz2', 'xz' or None)
160
        :type compress: str or None
161
        '''
162
        self.path = path
163
        self.exclude = exclude
164
165
        self._started = False
166
        self._buffsize = buffsize
167
168
        self._compress = compress if compress and buffsize > 15 else None
169
        self._mode, self._extension = self.compresion_modes[self._compress]
170
171
        self._pipe = self.pipe_class()
172
        self._th = self.thread_class(target=self._fill)
173
174
    def _fill(self):
175
        '''
176
        Writes data on internal tarfile instance, which writes to current
177
        object, using :meth:`write`.
178
179
        As this method is blocking, it is used inside a thread.
180
181
        This method is called automatically, on a thread, on initialization,
182
        so there is little need to call it manually.
183
        '''
184
        exclude = self.exclude
185
        path_join = os.path.join
186
        path = self.path
187
188
        def infofilter(info):
189
            return None if exclude(path_join(path, info.name)) else info
190
191
        tarfile = self.tarfile_class(
192
            fileobj=self._pipe,
193
            mode='w|{}'.format(self._mode),
194
            bufsize=self._buffsize
195
            )
196
197
        try:
198
            tarfile.add(self.path, "", filter=infofilter if exclude else None)
199
            tarfile.close()  # force stream flush
200
        except self.abort_exception:
201
            # expected exception when pipe is closed prematurely
202
            tarfile.close()  # free fd
203
        else:
204
            self.close()
205
206
    def __next__(self):
207
        '''
208
        Pulls chunk from tarfile (which is processed on its own thread).
209
210
        :param want: number bytes to read, defaults to 0 (all available)
211
        :type want: int
212
        :returns: tarfile data as bytes
213
        :rtype: bytes
214
        '''
215
        if not self._started:
216
            self._started = True
217
            self._th.start()
218
219
        try:
220
            return self._pipe.retrieve()
221
        except self.abort_exception:
222
            raise StopIteration()
223
224
    def close(self):
225
        '''
226
        Closes tarfile pipe and stops further processing.
227
        '''
228
        self._pipe.close()
229
230
    def __del__(self):
231
        '''
232
        Call :method:`TarFileStream.close`.
233
        '''
234
        self.close()
235