Completed
Push — master ( f74340...c2b843 )
by Felipe A.
58s
created

browsepy.file.check_under_base()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 11
rs 9.4286
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
16
from flask import current_app, send_from_directory, Response
17
from werkzeug.utils import cached_property
18
19
from ..compat import PY_LEGACY, range, FileNotFoundError
20
from .mimetype import detect_mimetype
21
22
undescore_replace = '%s:underscore' % __name__
23
codecs.register_error(undescore_replace,
24
                      (lambda error: (u'_', error.start + 1))
25
                      if PY_LEGACY else
26
                      (lambda error: ('_', error.start + 1))
27
                      )
28
29
class File(object):
30
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
31
    def __init__(self, path, app=None):
32
        self.path = path
33
        self.app = current_app if app is None else app
34
35
    def remove(self):
36
        if not self.can_remove:
37
            raise OutsideRemovableBase("File outside removable base")
38
        if self.is_directory:
39
            shutil.rmtree(self.path)
40
        else:
41
            os.unlink(self.path)
42
43
    def download(self):
44
        if self.is_directory:
45
            stream = TarFileStream(
46
                self.path,
47
                self.app.config["directory_tar_buffsize"]
48
                )
49
            return Response(stream, mimetype="application/octet-stream")
50
        directory, name = os.path.split(self.path)
51
        return send_from_directory(directory, name, as_attachment=True)
52
53
    def contains(self, filename):
54
        return os.path.exists(os.path.join(self.path, filename))
55
56
    def choose_filename(self, filename, attempts=999):
57
        new_filename = filename
58
        for attempt in range(2, attempts+1):
59
            if not self.contains(new_filename):
60
                return new_filename
61
            new_filename = alternative_filename(filename, attempt)
62
        while self.contains(new_filename):
63
            new_filename = alternative_filename(filename)
64
        return new_filename
65
66
    @property
67
    def actions(self):
68
        plugin_manager = self.app.extensions['plugin_manager']
69
        return plugin_manager.get_actions(self.mimetype)
70
71
    @cached_property
72
    def can_download(self):
73
        return self.app.config['directory_downloadable'] or not self.is_directory
74
75
    @cached_property
76
    def can_remove(self):
77
        dirbase = self.app.config["directory_remove"]
78
        if dirbase:
79
            return self.path.startswith(dirbase + os.sep)
80
        return False
81
82
    @cached_property
83
    def can_upload(self):
84
        dirbase = self.app.config["directory_upload"]
85
        if self.is_directory and dirbase:
86
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
87
        return False
88
89
    @cached_property
90
    def stats(self):
91
        return os.stat(self.path)
92
93
    @cached_property
94
    def mimetype(self):
95
        if self.is_directory:
96
            return 'inode/directory'
97
        return detect_mimetype(self.path)
98
99
    @cached_property
100
    def is_directory(self):
101
        return os.path.isdir(self.path)
102
103
    @cached_property
104
    def is_file(self):
105
        return os.path.isfile(self.path)
106
107
    @cached_property
108
    def is_empty(self):
109
        return not self._listdir
110
111
    @cached_property
112
    def parent(self):
113
        if self.path == self.app.config['directory_base']:
114
            return None
115
        return self.__class__(os.path.dirname(self.path), self.app)
116
117
    @cached_property
118
    def ancestors(self):
119
        ancestors = []
120
        parent = self.parent
121
        while parent:
122
            ancestors.append(parent)
123
            parent = parent.parent
124
        return tuple(ancestors)
125
126
    @property
127
    def modified(self):
128
        return datetime.datetime.fromtimestamp(self.mtime).strftime('%Y.%m.%d %H:%M:%S')
129
130
    @property
131
    def size(self):
132
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
133
        if unit == binary_units[0]:
134
            return "%d %s" % (size, unit)
135
        return "%.2f %s" % (size, unit)
136
137
    @property
138
    def urlpath(self):
139
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
140
141
    @property
142
    def name(self):
143
        return os.path.basename(self.path)
144
145
    @property
146
    def type(self):
147
        return self.mimetype.split(";", 1)[0]
148
149
    @property
150
    def encoding(self):
151
        if ";" in self.mimetype:
152
            match = self.re_charset.search(self.mimetype)
153
            gdict = match.groupdict() if match else {}
154
            return gdict.get("charset") or "default"
155
        return "default"
156
157
    def listdir(self):
158
        content = [
159
            self.__class__(path=os.path.join(self.path, path), app=self.app)
160
            for path in os.listdir(self.path)
161
            ]
162
        content.sort(key=lambda f: (f.is_directory, f.name.lower()))
163
        return content
164
165
    @classmethod
166
    def from_urlpath(cls, path, app=None):
167
        app = app or current_app
168
        base = app.config['directory_base']
169
        return cls(path=urlpath_to_abspath(path, base), app=app)
170
171
172
class TarFileStream(object):
173
    '''
174
    Tarfile which compresses while reading for streaming.
175
176
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
177
    compression.
178
    '''
179
    event_class = threading.Event
180
    thread_class = threading.Thread
181
    tarfile_class = tarfile.open
182
183
    def __init__(self, path, buffsize=10240):
184
        self.path = path
185
        self.name = os.path.basename(path) + ".tgz"
186
187
        self._finished = 0
188
        self._want = 0
189
        self._data = bytes()
190
        self._add = self.event_class()
191
        self._result = self.event_class()
192
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
193
        self._th = self.thread_class(target=self.fill)
194
        self._th.start()
195
196
    def fill(self):
197
        self._tarfile.add(self.path, "")
198
        self._tarfile.close() # force stream flush
199
        self._finished += 1
200
        if not self._result.is_set():
201
            self._result.set()
202
203
    def write(self, data):
204
        self._add.wait()
205
        self._data += data
206
        if len(self._data) > self._want:
207
            self._add.clear()
208
            self._result.set()
209
        return len(data)
210
211
    def read(self, want=0):
212
        if self._finished:
213
            if self._finished == 1:
214
                self._finished += 1
215
                return ""
216
            return EOFError("EOF reached")
217
218
        # Thread communication
219
        self._want = want
220
        self._add.set()
221
        self._result.wait()
222
        self._result.clear()
223
224
        if want:
225
            data = self._data[:want]
226
            self._data = self._data[want:]
227
        else:
228
            data = self._data
229
            self._data = bytes()
230
        return data
231
232
    def __iter__(self):
233
        data = self.read()
234
        while data:
235
            yield data
236
            data = self.read()
237
238
239
class OutsideDirectoryBase(Exception):
240
    pass
241
242
243
class OutsideRemovableBase(Exception):
244
    pass
245
246
247
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
248
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
249
def fmt_size(size, binary=True):
250
    '''
251
    Get size and unit.
252
253
    :param size: size in bytes
254
    :param binary: whether use binary or standard units, defaults to True
255
    :return: size and unit
256
    :rtype: tuple of int and unit as str
257
    '''
258
    if binary:
259
        fmt_sizes = binary_units
260
        fmt_divider = 1024.
261
    else:
262
        fmt_sizes = standard_units
263
        fmt_divider = 1000.
264
    for fmt in fmt_sizes[:-1]:
265
        if size < 1000:
266
            return (size, fmt)
267
        size /= fmt_divider
268
    return size, fmt_sizes[-1]
269
270
def relativize_path(path, base, os_sep=os.sep):
271
    '''
272
    Make absolute path relative to an absolute base.
273
274
    :param path: absolute path
275
    :param base: absolute base path
276
    :param os_sep: path component separator, defaults to current OS separator
277
    :return: relative path
278
    :rtype: str or unicode
279
    :raises OutsideDirectoryBase: if path is not below base
280
    '''
281
    if not check_under_base(path, base, os_sep):
282
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
283
    prefix_len = len(base)
284
    if not base.endswith(os_sep):
285
        prefix_len += len(os_sep)
286
    return path[prefix_len:]
287
288
def abspath_to_urlpath(path, base, os_sep=os.sep):
289
    '''
290
    Make filesystem absolute path uri relative using given absolute base path.
291
292
    :param path: absolute path
293
    :param base: absolute base path
294
    :param os_sep: path component separator, defaults to current OS separator
295
    :return: relative uri
296
    :rtype: str or unicode
297
    :raises OutsideDirectoryBase: if resulting path is not below base
298
    '''
299
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
300
301
def urlpath_to_abspath(path, base, os_sep=os.sep):
302
    '''
303
    Make uri relative path fs absolute using a given absolute base path.
304
305
    :param path: relative path
306
    :param base: absolute base path
307
    :param os_sep: path component separator, defaults to current OS separator
308
    :return: absolute path
309
    :rtype: str or unicode
310
    :raises OutsideDirectoryBase: if resulting path is not below base
311
    '''
312
    prefix = base if base.endswith(os_sep) else base + os_sep
313
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
314
    if base == realpath or realpath.startswith(prefix):
315
        return realpath
316
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
317
318
common_path_separators = '\\/'
319
def generic_filename(path):
320
    '''
321
    Extract filename of given path os-indepently, taking care of known path separators.
322
323
    :param path: path
324
    :return: filename
325
    :rtype: str or unicode (depending on given path)
326
    '''
327
328
    for sep in common_path_separators:
329
        if sep in path:
330
            _, path = path.rsplit(sep, 1)
331
    return path
332
333
restricted_chars = '\\/\0'
334
def clean_restricted_chars(path, restricted_chars=restricted_chars):
335
    '''
336
    Get path without restricted characters.
337
338
    :param path: path
339
    :return: path without restricted characters
340
    :rtype: str or unicode (depending on given path)
341
    '''
342
    for character in restricted_chars:
343
        path = path.replace(character, '_')
344
    return path
345
346
restricted_names = ('.', '..', '::', os.sep)
347
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
348
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
349
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
350
                             restricted_names=restricted_names):
351
    '''
352
    Get if given filename is forbidden for current OS or filesystem.
353
354
    :param filename:
355
    :param destiny_os: destination operative system
356
    :param fs_encoding: destination filesystem filename encoding
357
    :return: wether is forbidden on given OS (or filesystem) or not
358
    :rtype: bool
359
    '''
360
    if destiny_os == 'nt':
361
        fpc = filename.split('.', 1)[0].upper()
362
        if fpc in nt_device_names:
363
            return True
364
365
    return filename in restricted_names
366
367
def check_under_base(path, base, os_sep=os.sep):
368
    '''
369
    Check if given absolute path is under given base.
370
371
    :param path: absolute path
372
    :param base: absolute base path
373
    :return: wether file is under given base or not
374
    :rtype: bool
375
    '''
376
    prefix = base if base.endswith(os_sep) else base + os_sep
377
    return path == base or path.startswith(prefix)
378
379
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
380
    '''
381
    Get rid of parent path components and special filenames.
382
383
    If path is invalid or protected, return empty string.
384
385
    :param path: unsafe path
386
    :param destiny_os: destination operative system
387
    :param fs_encoding: destination filesystem filename encoding
388
    :return: filename or empty string
389
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
390
    '''
391
    path = generic_filename(path)
392
    path = clean_restricted_chars(path)
393
394
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
395
        return ''
396
397
    if fs_encoding != 'unicode':
398
        if PY_LEGACY and not isinstance(path, unicode):
399
            path = unicode(path, encoding='latin-1')
400
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
401
402
    return path
403
404
fs_safe_characters = string.ascii_uppercase + string.digits
405
def alternative_filename(filename, attempt=None):
406
    '''
407
    Generates an alternative version of given filename.
408
409
    If an number attempt parameter is given, will be used on the alternative
410
    name, a random value will be used otherwise.
411
412
    :param filename: original filename
413
    :param attempt: optional attempt number, defaults to null
414
    :return: new filename
415
    :rtype: str or unicode
416
    '''
417
    filename_parts = filename.rsplit('.', 2)
418
    name = filename_parts[0]
419
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
420
    if attempt is None:
421
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
422
    else:
423
        extra = ' (%d)' % attempt
424
    return '%s%s%s' % (name, extra, ext)
425