Completed
Push — master ( 8473d5...821f0f )
by Felipe A.
01:23
created

TarFileStream   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 65
Duplicated Lines 0 %

Importance

Changes 6
Bugs 0 Features 2
Metric Value
c 6
b 0
f 2
dl 0
loc 65
rs 10
wmc 11

5 Methods

Rating   Name   Duplication   Size   Complexity  
A read() 0 20 4
A __iter__() 0 5 2
A fill() 0 6 2
A write() 0 7 2
A __init__() 0 12 1
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
import functools
16
17
from flask import current_app, send_from_directory, Response
18
from werkzeug.utils import cached_property
19
20
from .compat import PY_LEGACY, range
21
22
undescore_replace = '%s:underscore' % __name__
23
codecs.register_error(undescore_replace,
24
                      (lambda error: (u'_', error.start + 1))
25
                      if PY_LEGACY else
26
                      (lambda error: ('_', error.start + 1))
27
                      )
28
if not PY_LEGACY:
29
    unicode = str
30
31
32
class File(object):
33
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
34
    parent_class = None # none means current class
35
36
    def __init__(self, path=None, app=None):
37
        self.path = path
38
        self.app = current_app if app is None else app
39
40
    def remove(self):
41
        if not self.can_remove:
42
            raise OutsideRemovableBase("File outside removable base")
43
        if self.is_directory:
44
            shutil.rmtree(self.path)
45
        else:
46
            os.unlink(self.path)
47
48
    def download(self):
49
        if self.is_directory:
50
            stream = TarFileStream(
51
                self.path,
52
                self.app.config["directory_tar_buffsize"]
53
                )
54
            return Response(stream, mimetype="application/octet-stream")
55
        directory, name = os.path.split(self.path)
56
        return send_from_directory(directory, name, as_attachment=True)
57
58
    def contains(self, filename):
59
        return os.path.exists(os.path.join(self.path, filename))
60
61
    def choose_filename(self, filename, attempts=999):
62
        new_filename = filename
63
        for attempt in range(2, attempts+1):
64
            if not self.contains(new_filename):
65
                return new_filename
66
            new_filename = alternative_filename(filename, attempt)
67
        while self.contains(new_filename):
68
            new_filename = alternative_filename(filename)
69
        return new_filename
70
71
    @property
72
    def plugin_manager(self):
73
        return self.app.extensions['plugin_manager']
74
75
    @property
76
    def default_action(self):
77
        for action in self.actions:
78
            if action.widget.place == 'link':
79
                return action
80
        endpoint = 'browse' if self.is_directory else 'open'
81
        widget = self.plugin_manager.link_class.from_file(self)
82
        return self.plugin_manager.action_class(endpoint, widget)
83
84
    @cached_property
85
    def actions(self):
86
        return self.plugin_manager.get_actions(self)
87
88
    @cached_property
89
    def can_download(self):
90
        return self.app.config['directory_downloadable'] or not self.is_directory
91
92
    @cached_property
93
    def can_remove(self):
94
        dirbase = self.app.config["directory_remove"]
95
        if dirbase:
96
            return self.path.startswith(dirbase + os.sep)
97
        return False
98
99
    @cached_property
100
    def can_upload(self):
101
        dirbase = self.app.config["directory_upload"]
102
        if self.is_directory and dirbase:
103
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
104
        return False
105
106
    @cached_property
107
    def stats(self):
108
        return os.stat(self.path)
109
110
    @cached_property
111
    def mimetype(self):
112
        if self.is_directory:
113
            return 'inode/directory'
114
        return self.plugin_manager.get_mimetype(self.path)
115
116
    @cached_property
117
    def is_directory(self):
118
        return os.path.isdir(self.path)
119
120
    @cached_property
121
    def is_file(self):
122
        return os.path.isfile(self.path)
123
124
    @cached_property
125
    def is_empty(self):
126
        return not self.raw_listdir
127
128
    @cached_property
129
    def parent(self):
130
        if self.path == self.app.config['directory_base']:
131
            return None
132
        parent_class = self.parent_class or self.__class__
133
        return parent_class(os.path.dirname(self.path), self.app)
134
135
    @cached_property
136
    def ancestors(self):
137
        ancestors = []
138
        parent = self.parent
139
        while parent:
140
            ancestors.append(parent)
141
            parent = parent.parent
142
        return tuple(ancestors)
143
144
    @cached_property
145
    def raw_listdir(self):
146
        return os.listdir(self.path)
147
148
    @property
149
    def modified(self):
150
        return datetime.datetime.fromtimestamp(self.stats.st_mtime).strftime('%Y.%m.%d %H:%M:%S')
151
152
    @property
153
    def size(self):
154
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
155
        if unit == binary_units[0]:
156
            return "%d %s" % (size, unit)
157
        return "%.2f %s" % (size, unit)
158
159
    @property
160
    def urlpath(self):
161
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
162
163
    @property
164
    def name(self):
165
        return os.path.basename(self.path)
166
167
    @property
168
    def type(self):
169
        return self.mimetype.split(";", 1)[0]
170
171
    @property
172
    def encoding(self):
173
        if ";" in self.mimetype:
174
            match = self.re_charset.search(self.mimetype)
175
            gdict = match.groupdict() if match else {}
176
            return gdict.get("charset") or "default"
177
        return "default"
178
179
    def listdir(self):
180
        path_joiner = functools.partial(os.path.join, self.path)
181
        content = [
182
            self.__class__(path=path_joiner(path), app=self.app)
183
            for path in self.raw_listdir
184
            ]
185
        content.sort(key=lambda f: (f.is_directory, f.name.lower()))
186
        return content
187
188
    @classmethod
189
    def from_urlpath(cls, path, app=None):
190
        app = app or current_app
191
        base = app.config['directory_base']
192
        return cls(path=urlpath_to_abspath(path, base), app=app)
193
194
195
class TarFileStream(object):
196
    '''
197
    Tarfile which compresses while reading for streaming.
198
199
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
200
    compression.
201
    '''
202
    event_class = threading.Event
203
    thread_class = threading.Thread
204
    tarfile_class = tarfile.open
205
206
    def __init__(self, path, buffsize=10240):
207
        self.path = path
208
        self.name = os.path.basename(path) + ".tgz"
209
210
        self._finished = 0
211
        self._want = 0
212
        self._data = bytes()
213
        self._add = self.event_class()
214
        self._result = self.event_class()
215
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
216
        self._th = self.thread_class(target=self.fill)
217
        self._th.start()
218
219
    def fill(self):
220
        self._tarfile.add(self.path, "")
221
        self._tarfile.close() # force stream flush
222
        self._finished += 1
223
        if not self._result.is_set():
224
            self._result.set()
225
226
    def write(self, data):
227
        self._add.wait()
228
        self._data += data
229
        if len(self._data) > self._want:
230
            self._add.clear()
231
            self._result.set()
232
        return len(data)
233
234
    def read(self, want=0):
235
        if self._finished:
236
            if self._finished == 1:
237
                self._finished += 1
238
                return ""
239
            return EOFError("EOF reached")
240
241
        # Thread communication
242
        self._want = want
243
        self._add.set()
244
        self._result.wait()
245
        self._result.clear()
246
247
        if want:
248
            data = self._data[:want]
249
            self._data = self._data[want:]
250
        else:
251
            data = self._data
252
            self._data = bytes()
253
        return data
254
255
    def __iter__(self):
256
        data = self.read()
257
        while data:
258
            yield data
259
            data = self.read()
260
261
262
class OutsideDirectoryBase(Exception):
263
    pass
264
265
266
class OutsideRemovableBase(Exception):
267
    pass
268
269
270
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
271
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
272
def fmt_size(size, binary=True):
273
    '''
274
    Get size and unit.
275
276
    :param size: size in bytes
277
    :param binary: whether use binary or standard units, defaults to True
278
    :return: size and unit
279
    :rtype: tuple of int and unit as str
280
    '''
281
    if binary:
282
        fmt_sizes = binary_units
283
        fmt_divider = 1024.
284
    else:
285
        fmt_sizes = standard_units
286
        fmt_divider = 1000.
287
    for fmt in fmt_sizes[:-1]:
288
        if size < 1000:
289
            return (size, fmt)
290
        size /= fmt_divider
291
    return size, fmt_sizes[-1]
292
293
def relativize_path(path, base, os_sep=os.sep):
294
    '''
295
    Make absolute path relative to an absolute base.
296
297
    :param path: absolute path
298
    :param base: absolute base path
299
    :param os_sep: path component separator, defaults to current OS separator
300
    :return: relative path
301
    :rtype: str or unicode
302
    :raises OutsideDirectoryBase: if path is not below base
303
    '''
304
    if not check_under_base(path, base, os_sep):
305
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
306
    prefix_len = len(base)
307
    if not base.endswith(os_sep):
308
        prefix_len += len(os_sep)
309
    return path[prefix_len:]
310
311
def abspath_to_urlpath(path, base, os_sep=os.sep):
312
    '''
313
    Make filesystem absolute path uri relative using given absolute base path.
314
315
    :param path: absolute path
316
    :param base: absolute base path
317
    :param os_sep: path component separator, defaults to current OS separator
318
    :return: relative uri
319
    :rtype: str or unicode
320
    :raises OutsideDirectoryBase: if resulting path is not below base
321
    '''
322
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
323
324
def urlpath_to_abspath(path, base, os_sep=os.sep):
325
    '''
326
    Make uri relative path fs absolute using a given absolute base path.
327
328
    :param path: relative path
329
    :param base: absolute base path
330
    :param os_sep: path component separator, defaults to current OS separator
331
    :return: absolute path
332
    :rtype: str or unicode
333
    :raises OutsideDirectoryBase: if resulting path is not below base
334
    '''
335
    prefix = base if base.endswith(os_sep) else base + os_sep
336
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
337
    if base == realpath or realpath.startswith(prefix):
338
        return realpath
339
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
340
341
common_path_separators = '\\/'
342
def generic_filename(path):
343
    '''
344
    Extract filename of given path os-indepently, taking care of known path separators.
345
346
    :param path: path
347
    :return: filename
348
    :rtype: str or unicode (depending on given path)
349
    '''
350
351
    for sep in common_path_separators:
352
        if sep in path:
353
            _, path = path.rsplit(sep, 1)
354
    return path
355
356
restricted_chars = '\\/\0'
357
def clean_restricted_chars(path, restricted_chars=restricted_chars):
358
    '''
359
    Get path without restricted characters.
360
361
    :param path: path
362
    :return: path without restricted characters
363
    :rtype: str or unicode (depending on given path)
364
    '''
365
    for character in restricted_chars:
366
        path = path.replace(character, '_')
367
    return path
368
369
restricted_names = ('.', '..', '::', os.sep)
370
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
371
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
372
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
373
                             restricted_names=restricted_names):
374
    '''
375
    Get if given filename is forbidden for current OS or filesystem.
376
377
    :param filename:
378
    :param destiny_os: destination operative system
379
    :param fs_encoding: destination filesystem filename encoding
380
    :return: wether is forbidden on given OS (or filesystem) or not
381
    :rtype: bool
382
    '''
383
    if destiny_os == 'nt':
384
        fpc = filename.split('.', 1)[0].upper()
385
        if fpc in nt_device_names:
386
            return True
387
388
    return filename in restricted_names
389
390
def check_under_base(path, base, os_sep=os.sep):
391
    '''
392
    Check if given absolute path is under given base.
393
394
    :param path: absolute path
395
    :param base: absolute base path
396
    :return: wether file is under given base or not
397
    :rtype: bool
398
    '''
399
    prefix = base if base.endswith(os_sep) else base + os_sep
400
    return path == base or path.startswith(prefix)
401
402
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
403
    '''
404
    Get rid of parent path components and special filenames.
405
406
    If path is invalid or protected, return empty string.
407
408
    :param path: unsafe path
409
    :param destiny_os: destination operative system
410
    :param fs_encoding: destination filesystem filename encoding
411
    :return: filename or empty string
412
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
413
    '''
414
    path = generic_filename(path)
415
    path = clean_restricted_chars(path)
416
417
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
418
        return ''
419
420
    if fs_encoding != 'unicode':
421
        if PY_LEGACY and not isinstance(path, unicode):
422
            path = unicode(path, encoding='latin-1')
423
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
424
425
    return path
426
427
fs_safe_characters = string.ascii_uppercase + string.digits
428
def alternative_filename(filename, attempt=None):
429
    '''
430
    Generates an alternative version of given filename.
431
432
    If an number attempt parameter is given, will be used on the alternative
433
    name, a random value will be used otherwise.
434
435
    :param filename: original filename
436
    :param attempt: optional attempt number, defaults to null
437
    :return: new filename
438
    :rtype: str or unicode
439
    '''
440
    filename_parts = filename.rsplit('.', 2)
441
    name = filename_parts[0]
442
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
443
    if attempt is None:
444
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
445
    else:
446
        extra = ' (%d)' % attempt
447
    return '%s%s%s' % (name, extra, ext)
448