Completed
Push — master ( 664f3c...5c8ee4 )
by Felipe A.
59s
created

browsepy.file.File.from_urlpath()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 5
rs 9.4286
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
16
from flask import current_app, send_from_directory, Response
17
from werkzeug.utils import cached_property
18
19
from ..compat import PY_LEGACY, range, FileNotFoundError
20
from .mimetype import detect_mimetype
21
22
undescore_replace = '%s:underscore' % __name__
23
codecs.register_error(undescore_replace,
24
                      (lambda error: (u'_', error.start + 1))
25
                      if PY_LEGACY else
26
                      (lambda error: ('_', error.start + 1))
27
                      )
28
29
class File(object):
30
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
31
    def __init__(self, path, app=None):
32
        self.path = path
33
        self.app = current_app if app is None else app
34
35
    def remove(self):
36
        if not self.can_remove:
37
            raise OutsideRemovableBase("File outside removable base")
38
        if self.is_directory:
39
            shutil.rmtree(self.path)
40
        else:
41
            os.unlink(self.path)
42
43
    def download(self):
44
        if self.is_directory:
45
            stream = TarFileStream(
46
                self.path,
47
                self.app.config["directory_tar_buffsize"]
48
                )
49
            return Response(stream, mimetype="application/octet-stream")
50
        directory, name = os.path.split(self.path)
51
        return send_from_directory(directory, name, as_attachment=True)
52
53
    def contains(self, filename):
54
        return os.path.exists(os.path.join(self.path, filename))
55
56
    def choose_filename(self, filename, attempts=999):
57
        new_filename = filename
58
        for attempt in range(2, attempts+1):
59
            if not self.contains(new_filename):
60
                return new_filename
61
            new_filename = alternative_filename(filename, attempt)
62
        while self.contains(new_filename):
63
            new_filename = alternative_filename(filename)
64
        return new_filename
65
66
    @property
67
    def actions(self):
68
        plugin_manager = self.app.extensions['plugin_manager']
69
        return plugin_manager.get_actions(self.mimetype)
70
71
    @cached_property
72
    def can_download(self):
73
        return self.app.config['directory_downloadable'] or not self.is_directory
74
75
    @cached_property
76
    def can_remove(self):
77
        dirbase = self.app.config["directory_remove"]
78
        if dirbase:
79
            return self.path.startswith(dirbase + os.sep)
80
        return False
81
82
    @cached_property
83
    def can_upload(self):
84
        dirbase = self.app.config["directory_upload"]
85
        if self.is_directory and dirbase:
86
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
87
        return False
88
89
    @cached_property
90
    def stats(self):
91
        return os.stat(self.path)
92
93
    @cached_property
94
    def mimetype(self):
95
        if self.is_directory:
96
            return 'inode/directory'
97
        return detect_mimetype(self.path)
98
99
    @cached_property
100
    def is_directory(self):
101
        return os.path.isdir(self.path)
102
103
    @cached_property
104
    def is_file(self):
105
        return os.path.isfile(self.path)
106
107
    @cached_property
108
    def is_empty(self):
109
        return not self._listdir
110
111
    @cached_property
112
    def parent(self):
113
        if self.path == self.app.config['directory_base']:
114
            return None
115
        return self.__class__(os.path.dirname(self.path), self.app)
116
117
    @cached_property
118
    def ancestors(self):
119
        ancestors = []
120
        parent = self.parent
121
        while parent:
122
            ancestors.append(parent)
123
            parent = parent.parent
124
        return tuple(ancestors)
125
126
    @property
127
    def modified(self):
128
        return datetime.datetime.fromtimestamp(self.mtime).strftime('%Y.%m.%d %H:%M:%S')
129
130
    @property
131
    def size(self):
132
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
133
        if unit == binary_units[0]:
134
            return "%d %s" % (size, unit)
135
        return "%.2f %s" % (size, unit)
136
137
    @property
138
    def urlpath(self):
139
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
140
141
    @property
142
    def name(self):
143
        return os.path.basename(self.path)
144
145
    @property
146
    def type(self):
147
        return self.mimetype.split(";", 1)[0]
148
149
    @property
150
    def encoding(self):
151
        if ";" in self.mimetype:
152
            match = self.re_charset.search(self.mimetype)
153
            gdict = match.groupdict() if match else {}
154
            return gdict.get("charset") or "default"
155
        return "default"
156
157
    @cached_property
158
    def _list(self):
159
        pjoin = os.path.join # minimize list comprehension overhead
160
        content = [pjoin(self.path, i) for i in os.listdir(self.path)]
161
        content.sort(key=self._list_order)
162
        return content
163
164
    @classmethod
165
    def _list_order(cls, path):
166
        return not os.path.isdir(path), os.path.basename(path).lower()
167
168
    def listdir(self):
169
        for path in self._list:
170
            yield self.__class__(path, self.app)
171
172
    @classmethod
173
    def from_urlpath(cls, path, app=None):
174
        app = app or current_app
175
        base = app.config['directory_base']
176
        return cls( urlpath_to_abspath(path, base), app)
177
178
179
class TarFileStream(object):
180
    '''
181
    Tarfile which compresses while reading for streaming.
182
183
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
184
    compression.
185
    '''
186
    event_class = threading.Event
187
    thread_class = threading.Thread
188
    tarfile_class = tarfile.open
189
190
    def __init__(self, path, buffsize=10240):
191
        self.path = path
192
        self.name = os.path.basename(path) + ".tgz"
193
194
        self._finished = 0
195
        self._want = 0
196
        self._data = bytes()
197
        self._add = self.event_class()
198
        self._result = self.event_class()
199
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
200
        self._th = self.thread_class(target=self.fill)
201
        self._th.start()
202
203
    def fill(self):
204
        self._tarfile.add(self.path, "")
205
        self._tarfile.close() # force stream flush
206
        self._finished += 1
207
        if not self._result.is_set():
208
            self._result.set()
209
210
    def write(self, data):
211
        self._add.wait()
212
        self._data += data
213
        if len(self._data) > self._want:
214
            self._add.clear()
215
            self._result.set()
216
        return len(data)
217
218
    def read(self, want=0):
219
        if self._finished:
220
            if self._finished == 1:
221
                self._finished += 1
222
                return ""
223
            return EOFError("EOF reached")
224
225
        # Thread communication
226
        self._want = want
227
        self._add.set()
228
        self._result.wait()
229
        self._result.clear()
230
231
        if want:
232
            data = self._data[:want]
233
            self._data = self._data[want:]
234
        else:
235
            data = self._data
236
            self._data = bytes()
237
        return data
238
239
    def __iter__(self):
240
        data = self.read()
241
        while data:
242
            yield data
243
            data = self.read()
244
245
246
class OutsideDirectoryBase(Exception):
247
    pass
248
249
250
class OutsideRemovableBase(Exception):
251
    pass
252
253
254
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
255
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
256
def fmt_size(size, binary=True):
257
    '''
258
    Get size and unit.
259
260
    :param size: size in bytes
261
    :param binary: whether use binary or standard units, defaults to True
262
    :return: size and unit
263
    :rtype: tuple of int and unit as str
264
    '''
265
    if binary:
266
        fmt_sizes = binary_units
267
        fmt_divider = 1024.
268
    else:
269
        fmt_sizes = standard_units
270
        fmt_divider = 1000.
271
    for fmt in fmt_sizes[:-1]:
272
        if size < 1000:
273
            return (size, fmt)
274
        size /= fmt_divider
275
    return size, fmt_sizes[-1]
276
277
def root_path(path, os_sep=os.sep):
278
    '''
279
    Get root of given path.
280
281
    :param path: absolute path
282
    :param os_sep: path component separator, defaults to current OS separator
283
    :return: path
284
    :rtype: str or unicode
285
    '''
286
    if os_sep == '\\' and path.startswith('//'):
287
        return '//%s' % path[2:].split('/')[0]
288
    return path.split(os_sep)[0] or '/'
289
290
def relativize_path(path, base, os_sep=os.sep):
291
    '''
292
    Make absolute path relative to an absolute base.
293
294
    :param path: absolute path
295
    :param base: absolute base path
296
    :param os_sep: path component separator, defaults to current OS separator
297
    :return: relative path
298
    :rtype: str or unicode
299
    :raises OutsideDirectoryBase: if path is not below base
300
    '''
301
    prefix = os.path.commonprefix((path, base))
302
    if not prefix or prefix == root_path(base, os_sep):
303
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
304
    prefix_len = len(prefix)
305
    if not prefix.endswith(os_sep):
306
        prefix_len += len(os_sep)
307
    relpath = path[prefix_len:]
308
    return relpath
309
310
def abspath_to_urlpath(path, base, os_sep=os.sep):
311
    '''
312
    Make filesystem absolute path uri relative using given absolute base path.
313
314
    :param path: absolute path
315
    :param base: absolute base path
316
    :param os_sep: path component separator, defaults to current OS separator
317
    :return: relative uri
318
    :rtype: str or unicode
319
    :raises OutsideDirectoryBase: if resulting path is not below base
320
    '''
321
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
322
323
def urlpath_to_abspath(path, base, os_sep=os.sep):
324
    '''
325
    Make uri relative path fs absolute using a given absolute base path.
326
327
    :param path: relative path
328
    :param base: absolute base path
329
    :param os_sep: path component separator, defaults to current OS separator
330
    :return: absolute path
331
    :rtype: str or unicode
332
    :raises OutsideDirectoryBase: if resulting path is not below base
333
    '''
334
    prefix = base if base.endswith(os_sep) else base + os_sep
335
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
336
    if base == realpath or realpath.startswith(prefix):
337
        return realpath
338
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
339
340
common_path_separators = '\\/'
341
def generic_filename(path):
342
    '''
343
    Extract filename of given path os-indepently, taking care of known path separators.
344
345
    :param path: path
346
    :return: filename
347
    :rtype: str or unicode (depending on given path)
348
    '''
349
350
    for sep in common_path_separators:
351
        if sep in path:
352
            _, path = path.rsplit(sep, 1)
353
    return path
354
355
restricted_chars = '\\/\0'
356
def clean_restricted_chars(path, restricted_chars=restricted_chars):
357
    '''
358
    Get path without restricted characters.
359
360
    :param path: path
361
    :return: path without restricted characters
362
    :rtype: str or unicode (depending on given path)
363
    '''
364
    for character in restricted_chars:
365
        path = path.replace(character, '_')
366
    return path
367
368
restricted_names = ('.', '..', '::', os.sep)
369
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
370
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
371
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
372
                             restricted_names=restricted_names):
373
    '''
374
    Get if given filename is forbidden for current OS or filesystem.
375
376
    :param filename:
377
    :param destiny_os: destination operative system
378
    :param fs_encoding: destination filesystem filename encoding
379
    :return: whether is forbidden on given OS (or filesystem) or not
380
    :rtype: bool
381
    '''
382
    if destiny_os == 'nt':
383
        fpc = filename.split('.', 1)[0].upper()
384
        if fpc in nt_device_names:
385
            return True
386
387
    return filename in restricted_names
388
389
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
390
    '''
391
    Get rid of parent path components and special filenames.
392
393
    If path is invalid or protected, return empty string.
394
395
    :param path: unsafe path
396
    :param destiny_os: destination operative system
397
    :param fs_encoding: destination filesystem filename encoding
398
    :return: filename or empty string
399
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
400
    '''
401
    path = generic_filename(path)
402
    path = clean_restricted_chars(path)
403
404
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
405
        return ''
406
407
    if fs_encoding != 'unicode':
408
        if PY_LEGACY and not isinstance(path, unicode):
409
            path = unicode(path, encoding='latin-1')
410
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
411
412
    return path
413
414
fs_safe_characters = string.ascii_uppercase + string.digits
415
def alternative_filename(filename, attempt=None):
416
    '''
417
    Generates an alternative version of given filename.
418
419
    If an number attempt parameter is given, will be used on the alternative
420
    name, a random value will be used otherwise.
421
422
    :param filename: original filename
423
    :param attempt: optional attempt number, defaults to null
424
    :return: new filename
425
    :rtype: str or unicode
426
    '''
427
    filename_parts = filename.rsplit('.', 2)
428
    name = filename_parts[0]
429
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
430
    if attempt is None:
431
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
432
    else:
433
        extra = ' (%d)' % attempt
434
    return '%s%s%s' % (name, extra, ext)
435