Completed
Push — file-actions ( 9b9199...28a134 )
by Felipe A.
33s
created

TarFileStream.__del__()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 5
rs 9.4285
cc 1
1
2
import os
3
import os.path
4
import tarfile
5
import threading
6
import functools
7
8
import browsepy.compat as compat
9
10
11
class BlockingPipeAbort(RuntimeError):
12
    '''
13
    Exception used internally by :class:`BlockingPipe`'s default
14
    implementation.
15
    '''
16
    pass
17
18
19
class BlockingPipe(object):
20
    '''
21
    Minimal pipe class with `write`, `retrieve` and `close` blocking methods.
22
23
    This class implementation assumes that :attr:`pipe_class` (set as
24
    class:`queue.Queue` in current implementation) instances has both `put`
25
    and `get blocking methods.
26
27
    Due its blocking implementation, this class uses :module:`threading`.
28
29
    This class exposes :method:`write` for :class:`tarfile.TarFile`
30
    `fileobj` compatibility.
31
    '''''
32
    lock_class = threading.Lock
33
    pipe_class = functools.partial(compat.Queue, maxsize=1)
34
    abort_exception = BlockingPipeAbort
35
36
    def __init__(self):
37
        self._pipe = self.pipe_class()
38
        self._wlock = self.lock_class()
39
        self._rlock = self.lock_class()
40
        self.closed = False
41
42
    def write(self, data):
43
        '''
44
        Put chunk of data onto pipe.
45
        This method blocks if pipe is already full.
46
47
        :param data: bytes to write to pipe
48
        :type data: bytes
49
        :returns: number of bytes written
50
        :rtype: int
51
        :raises WriteAbort: if already closed or closed while blocking
52
        '''
53
54
        with self._wlock:
55
            if self.closed:
56
                raise self.abort_exception()
57
            self._pipe.put(data)
58
            return len(data)
59
60
    def retrieve(self):
61
        '''
62
        Get chunk of data from pipe.
63
        This method blocks if pipe is empty.
64
65
        :returns: data chunk
66
        :rtype: bytes
67
        :raises WriteAbort: if already closed or closed while blocking
68
        '''
69
70
        with self._rlock:
71
            if self.closed:
72
                raise self.abort_exception()
73
            data = self._pipe.get()
74
            if data is None:
75
                raise self.abort_exception()
76
            return data
77
78
    def __del__(self):
79
        '''
80
        Call :method:`BlockingPipe.close`.
81
        '''
82
        self.close()
83
84
    def close(self):
85
        '''
86
        Closes, so any blocked and future writes or retrieves will raise
87
        :attr:`abort_exception` instances.
88
        '''
89
        if not self.closed:
90
            self.closed = True
91
92
            # release locks
93
            reading = not self._rlock.acquire(blocking=False)
94
            writing = not self._wlock.acquire(blocking=False)
95
96
            if not reading:
97
                if writing:
98
                    self._pipe.get()
99
                self._rlock.release()
100
101
            if not writing:
102
                if reading:
103
                    self._pipe.put(None)
104
                self._wlock.release()
105
106
107
class TarFileStream(compat.Generator):
108
    '''
109
    Iterable/generator class which yields tarfile chunks for streaming.
110
111
    This class implements :class:`collections.abc.Generator` interface
112
    (`PEP 325 <https://www.python.org/dev/peps/pep-0342/>`_),
113
    so it can be appropriately handled by wsgi servers
114
    (`PEP 333<https://www.python.org/dev/peps/pep-0333>`_).
115
116
    Buffsize can be provided, it should be 512 multiple (the tar block size)
117
    for and will be used as tarfile block size.
118
119
    This class uses :module:`threading` for offloading.
120
    '''
121
122
    pipe_class = BlockingPipe
123
    abort_exception = BlockingPipe.abort_exception
124
    thread_class = threading.Thread
125
    tarfile_class = tarfile.open
126
127
    extensions = {
128
        'gz': 'tgz',
129
        'bz2': 'tar.bz2',
130
        'xz': 'tar.xz',
131
        }
132
133
    @property
134
    def name(self):
135
        return '%s.%s' % (
136
            os.path.basename(self.path),
137
            self.extensions.get(self._compress, 'tar')
138
            )
139
140
    def __init__(self, path, buffsize=10240, exclude=None, compress='gz'):
141
        '''
142
        Initialize thread and class (thread is not started until interated.)
143
        Note that compress parameter will be ignored if buffsize is below 16.
144
145
        :param path: local path of directory whose content will be compressed.
146
        :type path: str
147
        :param buffsize: byte size of tarfile blocks, defaults to 10KiB
148
        :type buffsize: int
149
        :param exclude: absolute path filtering function, defaults to None
150
        :type exclude: callable
151
        :param compress: compression method ('gz', 'bz2', 'xz' or None)
152
        :type compress: str or None
153
        '''
154
        self.path = path
155
        self.exclude = exclude
156
157
        self._started = False
158
        self._buffsize = buffsize
159
        self._compress = compress if compress and buffsize > 15 else ''
160
        self._pipe = self.pipe_class()
161
        self._th = self.thread_class(target=self._fill)
162
163
    def _fill(self):
164
        '''
165
        Writes data on internal tarfile instance, which writes to current
166
        object, using :meth:`write`.
167
168
        As this method is blocking, it is used inside a thread.
169
170
        This method is called automatically, on a thread, on initialization,
171
        so there is little need to call it manually.
172
        '''
173
        exclude = self.exclude
174
        path_join = os.path.join
175
        path = self.path
176
177
        def infofilter(info):
178
            return None if exclude(path_join(path, info.name)) else info
179
180
        tarfile = self.tarfile_class(
181
            fileobj=self._pipe,
182
            mode='w|{}'.format(self._compress),
183
            bufsize=self._buffsize
184
            )
185
186
        try:
187
            tarfile.add(self.path, "", filter=infofilter if exclude else None)
188
            tarfile.close()  # force stream flush
189
        except self.abort_exception:
190
            # expected exception when pipe is closed prematurely
191
            tarfile.close()  # free fd
192
        else:
193
            self.close()
194
195
    def send(self, value):
196
        '''
197
        Pulls chunk from tarfile (which is processed on its own thread).
198
199
        :param want: number bytes to read, defaults to 0 (all available)
200
        :type want: int
201
        :returns: tarfile data as bytes
202
        :rtype: bytes
203
        '''
204
        if not self._started:
205
            self._started = True
206
            self._th.start()
207
208
        try:
209
            return self._pipe.retrieve()
210
        except self.abort_exception:
211
            raise StopIteration()
212
213
    def throw(self, typ, val=None, tb=None):
214
        '''
215
        Raise an exception in the coroutine.
216
        Return next yielded value or raise StopIteration.
217
        '''
218
        try:
219
            if val is None:
220
                if tb is None:
221
                    raise typ
222
                val = typ()
223
            if tb is not None:
224
                val = val.with_traceback(tb)
225
            raise val
226
        except GeneratorExit:
227
            self._pipe.close()
228
            raise
229
230
    def __del__(self):
231
        '''
232
        Call :method:`TarFileStream.close`,
233
        '''
234
        self.close()
235