Completed
Push — file-actions ( 28a134...b2ad98 )
by Felipe A.
01:44
created

TarFileStream.send()   A

Complexity

Conditions 3

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 17
rs 9.4285
cc 3
1
2
import os
3
import os.path
4
import tarfile
5
import threading
6
import functools
7
8
import browsepy.compat as compat
9
10
11
class BlockingPipeAbort(RuntimeError):
12
    '''
13
    Exception used internally by :class:`BlockingPipe`'s default
14
    implementation.
15
    '''
16
    pass
17
18
19
class BlockingPipe(object):
20
    '''
21
    Minimal pipe class with `write`, `retrieve` and `close` blocking methods.
22
23
    This class implementation assumes that :attr:`pipe_class` (set as
24
    class:`queue.Queue` in current implementation) instances has both `put`
25
    and `get blocking methods.
26
27
    Due its blocking implementation, this class uses :module:`threading`.
28
29
    This class exposes :method:`write` for :class:`tarfile.TarFile`
30
    `fileobj` compatibility.
31
    '''''
32
    lock_class = threading.Lock
33
    pipe_class = functools.partial(compat.Queue, maxsize=1)
34
    abort_exception = BlockingPipeAbort
35
36
    def __init__(self):
37
        self._pipe = self.pipe_class()
38
        self._wlock = self.lock_class()
39
        self._rlock = self.lock_class()
40
        self.closed = False
41
42
    def write(self, data):
43
        '''
44
        Put chunk of data onto pipe.
45
        This method blocks if pipe is already full.
46
47
        :param data: bytes to write to pipe
48
        :type data: bytes
49
        :returns: number of bytes written
50
        :rtype: int
51
        :raises WriteAbort: if already closed or closed while blocking
52
        '''
53
54
        with self._wlock:
55
            if self.closed:
56
                raise self.abort_exception()
57
            self._pipe.put(data)
58
            return len(data)
59
60
    def retrieve(self):
61
        '''
62
        Get chunk of data from pipe.
63
        This method blocks if pipe is empty.
64
65
        :returns: data chunk
66
        :rtype: bytes
67
        :raises WriteAbort: if already closed or closed while blocking
68
        '''
69
70
        with self._rlock:
71
            if self.closed:
72
                raise self.abort_exception()
73
            data = self._pipe.get()
74
            if data is None:
75
                raise self.abort_exception()
76
            return data
77
78
    def __del__(self):
79
        '''
80
        Call :method:`BlockingPipe.close`.
81
        '''
82
        self.close()
83
84
    def close(self):
85
        '''
86
        Closes, so any blocked and future writes or retrieves will raise
87
        :attr:`abort_exception` instances.
88
        '''
89
        if not self.closed:
90
            self.closed = True
91
92
            # release locks
93
            reading = not self._rlock.acquire(False)
94
            writing = not self._wlock.acquire(False)
95
96
            if not reading:
97
                if writing:
98
                    self._pipe.get()
99
                self._rlock.release()
100
101
            if not writing:
102
                if reading:
103
                    self._pipe.put(None)
104
                self._wlock.release()
105
106
107
class TarFileStream(compat.Iterator):
108
    '''
109
    Iterator class which yields tarfile chunks for streaming.
110
111
    This class implements :class:`collections.abc.Iterator` interface
112
    with :method:`close`, so it can be appropriately handled by wsgi servers
113
    (`PEP 333<https://www.python.org/dev/peps/pep-0333>`_).
114
115
    Buffsize can be provided, it should be 512 multiple (the tar block size)
116
    for and will be used as tarfile block size.
117
118
    This class uses :module:`threading` for offloading.
119
    '''
120
121
    pipe_class = BlockingPipe
122
    abort_exception = BlockingPipe.abort_exception
123
    thread_class = threading.Thread
124
    tarfile_class = tarfile.open
125
126
    extensions = {
127
        'gz': 'tgz',
128
        'bz2': 'tar.bz2',
129
        'xz': 'tar.xz',
130
        }
131
132
    @property
133
    def name(self):
134
        return '%s.%s' % (
135
            os.path.basename(self.path),
136
            self.extensions.get(self._compress, 'tar')
137
            )
138
139
    def __init__(self, path, buffsize=10240, exclude=None, compress='gz'):
140
        '''
141
        Initialize thread and class (thread is not started until interated.)
142
        Note that compress parameter will be ignored if buffsize is below 16.
143
144
        :param path: local path of directory whose content will be compressed.
145
        :type path: str
146
        :param buffsize: byte size of tarfile blocks, defaults to 10KiB
147
        :type buffsize: int
148
        :param exclude: absolute path filtering function, defaults to None
149
        :type exclude: callable
150
        :param compress: compression method ('gz', 'bz2', 'xz' or None)
151
        :type compress: str or None
152
        '''
153
        self.path = path
154
        self.exclude = exclude
155
156
        self._started = False
157
        self._buffsize = buffsize
158
        self._compress = compress if compress and buffsize > 15 else ''
159
        self._pipe = self.pipe_class()
160
        self._th = self.thread_class(target=self._fill)
161
162
    def _fill(self):
163
        '''
164
        Writes data on internal tarfile instance, which writes to current
165
        object, using :meth:`write`.
166
167
        As this method is blocking, it is used inside a thread.
168
169
        This method is called automatically, on a thread, on initialization,
170
        so there is little need to call it manually.
171
        '''
172
        exclude = self.exclude
173
        path_join = os.path.join
174
        path = self.path
175
176
        def infofilter(info):
177
            return None if exclude(path_join(path, info.name)) else info
178
179
        tarfile = self.tarfile_class(
180
            fileobj=self._pipe,
181
            mode='w|{}'.format(self._compress),
182
            bufsize=self._buffsize
183
            )
184
185
        try:
186
            tarfile.add(self.path, "", filter=infofilter if exclude else None)
187
            tarfile.close()  # force stream flush
188
        except self.abort_exception:
189
            # expected exception when pipe is closed prematurely
190
            tarfile.close()  # free fd
191
        else:
192
            self.close()
193
194
    def __next__(self):
195
        '''
196
        Pulls chunk from tarfile (which is processed on its own thread).
197
198
        :param want: number bytes to read, defaults to 0 (all available)
199
        :type want: int
200
        :returns: tarfile data as bytes
201
        :rtype: bytes
202
        '''
203
        if not self._started:
204
            self._started = True
205
            self._th.start()
206
207
        try:
208
            return self._pipe.retrieve()
209
        except self.abort_exception:
210
            raise StopIteration()
211
212
    def close(self):
213
        '''
214
        Closes tarfile pipe and stops further processing.
215
        '''
216
        self._pipe.close()
217
218
    def __del__(self):
219
        '''
220
        Call :method:`TarFileStream.close`.
221
        '''
222
        self.close()
223