Completed
Push — master ( c2b843...91b860 )
by Felipe A.
49s
created

browsepy.secure_filename()   B

Complexity

Conditions 5

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 24
rs 8.1672
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
16
from flask import current_app, send_from_directory, Response
17
from werkzeug.utils import cached_property
18
19
from .compat import PY_LEGACY, range, FileNotFoundError
20
21
undescore_replace = '%s:underscore' % __name__
22
codecs.register_error(undescore_replace,
23
                      (lambda error: (u'_', error.start + 1))
24
                      if PY_LEGACY else
25
                      (lambda error: ('_', error.start + 1))
26
                      )
27
28
class File(object):
29
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
30
    def __init__(self, path=None, app=None):
31
        self.path = path
32
        self.app = current_app if app is None else app
33
34
    def remove(self):
35
        if not self.can_remove:
36
            raise OutsideRemovableBase("File outside removable base")
37
        if self.is_directory:
38
            shutil.rmtree(self.path)
39
        else:
40
            os.unlink(self.path)
41
42
    def download(self):
43
        if self.is_directory:
44
            stream = TarFileStream(
45
                self.path,
46
                self.app.config["directory_tar_buffsize"]
47
                )
48
            return Response(stream, mimetype="application/octet-stream")
49
        directory, name = os.path.split(self.path)
50
        return send_from_directory(directory, name, as_attachment=True)
51
52
    def contains(self, filename):
53
        return os.path.exists(os.path.join(self.path, filename))
54
55
    def choose_filename(self, filename, attempts=999):
56
        new_filename = filename
57
        for attempt in range(2, attempts+1):
58
            if not self.contains(new_filename):
59
                return new_filename
60
            new_filename = alternative_filename(filename, attempt)
61
        while self.contains(new_filename):
62
            new_filename = alternative_filename(filename)
63
        return new_filename
64
65
    @property
66
    def plugin_manager(self):
67
        return self.app.extensions['plugin_manager']
68
69
    @property
70
    def actions(self):
71
        return self.plugin_manager.get_actions(self.mimetype)
72
73
    @cached_property
74
    def can_download(self):
75
        return self.app.config['directory_downloadable'] or not self.is_directory
76
77
    @cached_property
78
    def can_remove(self):
79
        dirbase = self.app.config["directory_remove"]
80
        if dirbase:
81
            return self.path.startswith(dirbase + os.sep)
82
        return False
83
84
    @cached_property
85
    def can_upload(self):
86
        dirbase = self.app.config["directory_upload"]
87
        if self.is_directory and dirbase:
88
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
89
        return False
90
91
    @cached_property
92
    def stats(self):
93
        return os.stat(self.path)
94
95
    @cached_property
96
    def mimetype(self):
97
        if self.is_directory:
98
            return 'inode/directory'
99
        return self.plugin_manager.get_mimetype(self.path)
100
101
    @cached_property
102
    def is_directory(self):
103
        return os.path.isdir(self.path)
104
105
    @cached_property
106
    def is_file(self):
107
        return os.path.isfile(self.path)
108
109
    @cached_property
110
    def is_empty(self):
111
        return not self._listdir
112
113
    @cached_property
114
    def parent(self):
115
        if self.path == self.app.config['directory_base']:
116
            return None
117
        return self.__class__(os.path.dirname(self.path), self.app)
118
119
    @cached_property
120
    def ancestors(self):
121
        ancestors = []
122
        parent = self.parent
123
        while parent:
124
            ancestors.append(parent)
125
            parent = parent.parent
126
        return tuple(ancestors)
127
128
    @property
129
    def modified(self):
130
        return datetime.datetime.fromtimestamp(self.mtime).strftime('%Y.%m.%d %H:%M:%S')
131
132
    @property
133
    def size(self):
134
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
135
        if unit == binary_units[0]:
136
            return "%d %s" % (size, unit)
137
        return "%.2f %s" % (size, unit)
138
139
    @property
140
    def urlpath(self):
141
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
142
143
    @property
144
    def name(self):
145
        return os.path.basename(self.path)
146
147
    @property
148
    def type(self):
149
        return self.mimetype.split(";", 1)[0]
150
151
    @property
152
    def encoding(self):
153
        if ";" in self.mimetype:
154
            match = self.re_charset.search(self.mimetype)
155
            gdict = match.groupdict() if match else {}
156
            return gdict.get("charset") or "default"
157
        return "default"
158
159
    def listdir(self):
160
        content = [
161
            self.__class__(path=os.path.join(self.path, path), app=self.app)
162
            for path in os.listdir(self.path)
163
            ]
164
        content.sort(key=lambda f: (f.is_directory, f.name.lower()))
165
        return content
166
167
    @classmethod
168
    def from_urlpath(cls, path, app=None):
169
        app = app or current_app
170
        base = app.config['directory_base']
171
        return cls(path=urlpath_to_abspath(path, base), app=app)
172
173
174
class TarFileStream(object):
175
    '''
176
    Tarfile which compresses while reading for streaming.
177
178
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
179
    compression.
180
    '''
181
    event_class = threading.Event
182
    thread_class = threading.Thread
183
    tarfile_class = tarfile.open
184
185
    def __init__(self, path, buffsize=10240):
186
        self.path = path
187
        self.name = os.path.basename(path) + ".tgz"
188
189
        self._finished = 0
190
        self._want = 0
191
        self._data = bytes()
192
        self._add = self.event_class()
193
        self._result = self.event_class()
194
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
195
        self._th = self.thread_class(target=self.fill)
196
        self._th.start()
197
198
    def fill(self):
199
        self._tarfile.add(self.path, "")
200
        self._tarfile.close() # force stream flush
201
        self._finished += 1
202
        if not self._result.is_set():
203
            self._result.set()
204
205
    def write(self, data):
206
        self._add.wait()
207
        self._data += data
208
        if len(self._data) > self._want:
209
            self._add.clear()
210
            self._result.set()
211
        return len(data)
212
213
    def read(self, want=0):
214
        if self._finished:
215
            if self._finished == 1:
216
                self._finished += 1
217
                return ""
218
            return EOFError("EOF reached")
219
220
        # Thread communication
221
        self._want = want
222
        self._add.set()
223
        self._result.wait()
224
        self._result.clear()
225
226
        if want:
227
            data = self._data[:want]
228
            self._data = self._data[want:]
229
        else:
230
            data = self._data
231
            self._data = bytes()
232
        return data
233
234
    def __iter__(self):
235
        data = self.read()
236
        while data:
237
            yield data
238
            data = self.read()
239
240
241
class OutsideDirectoryBase(Exception):
242
    pass
243
244
245
class OutsideRemovableBase(Exception):
246
    pass
247
248
249
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
250
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
251
def fmt_size(size, binary=True):
252
    '''
253
    Get size and unit.
254
255
    :param size: size in bytes
256
    :param binary: whether use binary or standard units, defaults to True
257
    :return: size and unit
258
    :rtype: tuple of int and unit as str
259
    '''
260
    if binary:
261
        fmt_sizes = binary_units
262
        fmt_divider = 1024.
263
    else:
264
        fmt_sizes = standard_units
265
        fmt_divider = 1000.
266
    for fmt in fmt_sizes[:-1]:
267
        if size < 1000:
268
            return (size, fmt)
269
        size /= fmt_divider
270
    return size, fmt_sizes[-1]
271
272
def relativize_path(path, base, os_sep=os.sep):
273
    '''
274
    Make absolute path relative to an absolute base.
275
276
    :param path: absolute path
277
    :param base: absolute base path
278
    :param os_sep: path component separator, defaults to current OS separator
279
    :return: relative path
280
    :rtype: str or unicode
281
    :raises OutsideDirectoryBase: if path is not below base
282
    '''
283
    if not check_under_base(path, base, os_sep):
284
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
285
    prefix_len = len(base)
286
    if not base.endswith(os_sep):
287
        prefix_len += len(os_sep)
288
    return path[prefix_len:]
289
290
def abspath_to_urlpath(path, base, os_sep=os.sep):
291
    '''
292
    Make filesystem absolute path uri relative using given absolute base path.
293
294
    :param path: absolute path
295
    :param base: absolute base path
296
    :param os_sep: path component separator, defaults to current OS separator
297
    :return: relative uri
298
    :rtype: str or unicode
299
    :raises OutsideDirectoryBase: if resulting path is not below base
300
    '''
301
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
302
303
def urlpath_to_abspath(path, base, os_sep=os.sep):
304
    '''
305
    Make uri relative path fs absolute using a given absolute base path.
306
307
    :param path: relative path
308
    :param base: absolute base path
309
    :param os_sep: path component separator, defaults to current OS separator
310
    :return: absolute path
311
    :rtype: str or unicode
312
    :raises OutsideDirectoryBase: if resulting path is not below base
313
    '''
314
    prefix = base if base.endswith(os_sep) else base + os_sep
315
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
316
    if base == realpath or realpath.startswith(prefix):
317
        return realpath
318
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
319
320
common_path_separators = '\\/'
321
def generic_filename(path):
322
    '''
323
    Extract filename of given path os-indepently, taking care of known path separators.
324
325
    :param path: path
326
    :return: filename
327
    :rtype: str or unicode (depending on given path)
328
    '''
329
330
    for sep in common_path_separators:
331
        if sep in path:
332
            _, path = path.rsplit(sep, 1)
333
    return path
334
335
restricted_chars = '\\/\0'
336
def clean_restricted_chars(path, restricted_chars=restricted_chars):
337
    '''
338
    Get path without restricted characters.
339
340
    :param path: path
341
    :return: path without restricted characters
342
    :rtype: str or unicode (depending on given path)
343
    '''
344
    for character in restricted_chars:
345
        path = path.replace(character, '_')
346
    return path
347
348
restricted_names = ('.', '..', '::', os.sep)
349
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
350
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
351
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
352
                             restricted_names=restricted_names):
353
    '''
354
    Get if given filename is forbidden for current OS or filesystem.
355
356
    :param filename:
357
    :param destiny_os: destination operative system
358
    :param fs_encoding: destination filesystem filename encoding
359
    :return: wether is forbidden on given OS (or filesystem) or not
360
    :rtype: bool
361
    '''
362
    if destiny_os == 'nt':
363
        fpc = filename.split('.', 1)[0].upper()
364
        if fpc in nt_device_names:
365
            return True
366
367
    return filename in restricted_names
368
369
def check_under_base(path, base, os_sep=os.sep):
370
    '''
371
    Check if given absolute path is under given base.
372
373
    :param path: absolute path
374
    :param base: absolute base path
375
    :return: wether file is under given base or not
376
    :rtype: bool
377
    '''
378
    prefix = base if base.endswith(os_sep) else base + os_sep
379
    return path == base or path.startswith(prefix)
380
381
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
382
    '''
383
    Get rid of parent path components and special filenames.
384
385
    If path is invalid or protected, return empty string.
386
387
    :param path: unsafe path
388
    :param destiny_os: destination operative system
389
    :param fs_encoding: destination filesystem filename encoding
390
    :return: filename or empty string
391
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
392
    '''
393
    path = generic_filename(path)
394
    path = clean_restricted_chars(path)
395
396
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
397
        return ''
398
399
    if fs_encoding != 'unicode':
400
        if PY_LEGACY and not isinstance(path, unicode):
401
            path = unicode(path, encoding='latin-1')
402
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
403
404
    return path
405
406
fs_safe_characters = string.ascii_uppercase + string.digits
407
def alternative_filename(filename, attempt=None):
408
    '''
409
    Generates an alternative version of given filename.
410
411
    If an number attempt parameter is given, will be used on the alternative
412
    name, a random value will be used otherwise.
413
414
    :param filename: original filename
415
    :param attempt: optional attempt number, defaults to null
416
    :return: new filename
417
    :rtype: str or unicode
418
    '''
419
    filename_parts = filename.rsplit('.', 2)
420
    name = filename_parts[0]
421
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
422
    if attempt is None:
423
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
424
    else:
425
        extra = ' (%d)' % attempt
426
    return '%s%s%s' % (name, extra, ext)
427