Completed
Push — master ( d7c118...c18a49 )
by Felipe A.
52s
created

browsepy.clean_restricted_chars()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 11
rs 9.4286
1
2
import sys
3
import os
4
import os.path
5
import re
6
import shutil
7
import codecs
8
import threading
9
import string
10
import tarfile
11
import random
12
import subprocess
13
import mimetypes
14
import datetime
15
16
from flask import current_app, send_from_directory, Response
17
from werkzeug.utils import cached_property
18
19
from .compat import PY_LEGACY, range, FileNotFoundError
20
21
undescore_replace = '%s:underscore' % __name__
22
codecs.register_error(undescore_replace,
23
                      (lambda error: (u'_', error.start + 1))
24
                      if PY_LEGACY else
25
                      (lambda error: ('_', error.start + 1))
26
                      )
27
28
class File(object):
29
    re_mime_validate = re.compile('\w+/\w+(; \w+=[^;]+)*')
30
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
31
    def __init__(self, path, app=None):
32
        self.path = path
33
        self.app = current_app if app is None else app
34
35
    def remove(self):
36
        if not self.can_remove:
37
            raise OutsideRemovableBase("File outside removable base")
38
        if self.is_directory:
39
            shutil.rmtree(self.path)
40
        else:
41
            os.unlink(self.path)
42
43
    def download(self):
44
        if self.is_directory:
45
            stream = TarFileStream(
46
                self.path,
47
                self.app.config["directory_tar_buffsize"]
48
                )
49
            return Response(stream, mimetype="application/octet-stream")
50
        directory, name = os.path.split(self.path)
51
        return send_from_directory(directory, name, as_attachment=True)
52
53
    def contains(self, filename):
54
        return os.path.exists(os.path.join(self.path, filename))
55
56
    def choose_filename(self, filename, attempts=999):
57
        new_filename = filename
58
        for attempt in range(2, attempts+1):
59
            if not self.contains(new_filename):
60
                return new_filename
61
            new_filename = alternative_filename(filename, attempt)
62
        while self.contains(new_filename):
63
            new_filename = alternative_filename(filename)
64
        return new_filename
65
66
    @property
67
    def actions(self):
68
        return self.app.actions.get(self.mimetype)
69
70
    @cached_property
71
    def can_download(self):
72
        return self.app.config['directory_downloadable'] or not self.is_directory
73
74
    @cached_property
75
    def can_remove(self):
76
        dirbase = self.app.config["directory_remove"]
77
        if dirbase:
78
            return self.path.startswith(dirbase + os.sep)
79
        return False
80
81
    @cached_property
82
    def can_upload(self):
83
        dirbase = self.app.config["directory_upload"]
84
        if self.is_directory and dirbase:
85
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
86
        return False
87
88
    @cached_property
89
    def stats(self):
90
        return os.stat(self.path)
91
92
    _generic_mimetypes = {
93
        None,
94
        'application/octet-stream',
95
        }
96
    @cached_property
97
    def mimetype(self):
98
        mime, encoding = mimetypes.guess_type(self.path)
99
        mimetype = "%s%s%s" % (mime or "application/octet-stream", "; " if encoding else "", encoding or "")
100
        if mime in self._generic_mimetypes:
101
            try:
102
                output = subprocess.check_output(("file", "-ib", self.path)).decode('utf8').strip()
103
                if self.re_mime_validate.match(output):
104
                    # 'file' command can return status zero with invalid output
105
                    mimetype = output
106
            except (subprocess.CalledProcessError, FileNotFoundError):
107
                pass
108
        return mimetype
109
110
    @cached_property
111
    def is_directory(self):
112
        return self.type.endswith("directory") or \
113
               self.type.endswith("symlink") and \
114
               os.path.isdir(self.path)
115
116
    @cached_property
117
    def parent(self):
118
        return File(os.path.dirname(self.path))
119
120
    @property
121
    def mtime(self):
122
        return self.stats.st_mtime
123
124
    @property
125
    def modified(self):
126
        return datetime.datetime.fromtimestamp(self.mtime).strftime('%Y.%m.%d %H:%M:%S')
127
128
    @property
129
    def size(self):
130
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
131
        if unit == binary_units[0]:
132
            return "%d %s" % (size, unit)
133
        return "%.2f %s" % (size, unit)
134
135
    @property
136
    def relpath(self):
137
        return relativize_path(self.path, self.app.config['directory_base'])
138
139
    @property
140
    def basename(self):
141
        return os.path.basename(self.path)
142
143
    @property
144
    def dirname(self):
145
        return os.path.dirname(self.path)
146
147
    @property
148
    def type(self):
149
        return self.mimetype.split(";", 1)[0]
150
151
    @property
152
    def encoding(self):
153
        if ";" in self.mimetype:
154
            match = self.re_charset.search(self.mimetype)
155
            gdict = match.groupdict() if match else {}
156
            return gdict.get("charset") or "default"
157
        return "default"
158
159
    @classmethod
160
    def listdir_order(cls, path):
161
        return not os.path.isdir(path), os.path.basename(path).lower()
162
163
    def listdir(self):
164
        pjoin = os.path.join # minimize list comprehension overhead
165
        content = [pjoin(self.path, i) for i in os.listdir(self.path)]
166
        content.sort(key=self.listdir_order)
167
        for i in content:
168
            yield self.__class__(i)
169
170
171
class TarFileStream(object):
172
    '''
173
    Tarfile which compresses while reading for streaming.
174
175
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
176
    compression.
177
    '''
178
    event_class = threading.Event
179
    thread_class = threading.Thread
180
    tarfile_class = tarfile.open
181
182
    def __init__(self, path, buffsize=10240):
183
        self.path = path
184
        self.name = os.path.basename(path) + ".tgz"
185
186
        self._finished = 0
187
        self._want = 0
188
        self._data = bytes()
189
        self._add = self.event_class()
190
        self._result = self.event_class()
191
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
192
        self._th = self.thread_class(target=self.fill)
193
        self._th.start()
194
195
    def fill(self):
196
        self._tarfile.add(self.path, "")
197
        self._tarfile.close() # force stream flush
198
        self._finished += 1
199
        if not self._result.is_set():
200
            self._result.set()
201
202
    def write(self, data):
203
        self._add.wait()
204
        self._data += data
205
        if len(self._data) > self._want:
206
            self._add.clear()
207
            self._result.set()
208
        return len(data)
209
210
    def read(self, want=0):
211
        if self._finished:
212
            if self._finished == 1:
213
                self._finished += 1
214
                return ""
215
            return EOFError("EOF reached")
216
217
        # Thread communication
218
        self._want = want
219
        self._add.set()
220
        self._result.wait()
221
        self._result.clear()
222
223
        if want:
224
            data = self._data[:want]
225
            self._data = self._data[want:]
226
        else:
227
            data = self._data
228
            self._data = bytes()
229
        return data
230
231
    def __iter__(self):
232
        data = self.read()
233
        while data:
234
            yield data
235
            data = self.read()
236
237
238
class OutsideDirectoryBase(Exception):
239
    pass
240
241
242
class OutsideRemovableBase(Exception):
243
    pass
244
245
246
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
247
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
248
def fmt_size(size, binary=True):
249
    '''
250
    Get size and unit.
251
252
    :param size: size in bytes
253
    :param binary: whether use binary or standard units, defaults to True
254
    :return: size and unit
255
    :rtype: tuple of int and unit as str
256
    '''
257
    if binary:
258
        fmt_sizes = binary_units
259
        fmt_divider = 1024.
260
    else:
261
        fmt_sizes = standard_units
262
        fmt_divider = 1000.
263
    for fmt in fmt_sizes[:-1]:
264
        if size < 1000:
265
            return (size, fmt)
266
        size /= fmt_divider
267
    return size, fmt_sizes[-1]
268
269
def root_path(path, os_sep=os.sep):
270
    '''
271
    Get root of given path.
272
273
    :param path: absolute path
274
    :param os_sep: path component separator, defaults to current OS separator
275
    :return: path
276
    :rtype: str or unicode
277
    '''
278
    if os_sep == '\\' and path.startswith('//'):
279
        return '//%s' % path[2:].split('/')[0]
280
    return path.split(os_sep)[0] or '/'
281
282
def relativize_path(path, base, os_sep=os.sep):
283
    '''
284
    Make absolute path relative to an absolute base.
285
286
    :param path: absolute path
287
    :param base: absolute base path
288
    :param os_sep: path component separator, defaults to current OS separator
289
    :return: relative path
290
    :rtype: str or unicode
291
    :raises OutsideDirectoryBase: if path is not below base
292
    '''
293
    prefix = os.path.commonprefix((path, base))
294
    if not prefix or prefix == root_path(base, os_sep):
295
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
296
    prefix_len = len(prefix)
297
    if not prefix.endswith(os_sep):
298
        prefix_len += len(os_sep)
299
    relpath = path[prefix_len:]
300
    return relpath
301
302
common_path_separators = '\\/'
303
def generic_filename(path):
304
    '''
305
    Extract filename of given path os-indepently, taking care of known path separators.
306
307
    :param path: path
308
    :return: filename
309
    :rtype: str or unicode (depending on given path)
310
    '''
311
312
    for sep in common_path_separators:
313
        if sep in path:
314
            _, path = path.rsplit(sep, 1)
315
    return path
316
317
restricted_chars = '\\/\0'
318
def clean_restricted_chars(path, restricted_chars=restricted_chars):
319
    '''
320
    Get path without restricted characters.
321
322
    :param path: path
323
    :return: path without restricted characters
324
    :rtype: str or unicode (depending on given path)
325
    '''
326
    for character in restricted_chars:
327
        path = path.replace(character, '_')
328
    return path
329
330
restricted_names = ('.', '..', '::', os.sep)
331
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
332
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
333
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
334
                             restricted_names=restricted_names):
335
    '''
336
    Get if given filename is forbidden for current OS or filesystem.
337
338
    :param filename:
339
    :param destiny_os: destination operative system
340
    :param fs_encoding: destination filesystem filename encoding
341
    :return: whether is forbidden on given OS (or filesystem) or not
342
    :rtype: bool
343
    '''
344
    if destiny_os == 'nt':
345
        fpc = filename.split('.', 1)[0].upper()
346
        if fpc in nt_device_names:
347
            return True
348
349
    return filename in restricted_names
350
351
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
352
    '''
353
    Get rid of parent path components and special filenames.
354
355
    If path is invalid or protected, return empty string.
356
357
    :param path: unsafe path
358
    :param destiny_os: destination operative system
359
    :param fs_encoding: destination filesystem filename encoding
360
    :return: filename or empty string
361
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
362
    '''
363
    path = generic_filename(path)
364
    path = clean_restricted_chars(path)
365
366
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
367
        return ''
368
369
    if fs_encoding != 'unicode':
370
        if PY_LEGACY and not isinstance(path, unicode):
371
            path = unicode(path, encoding='latin-1')
372
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
373
374
    return path
375
376
fs_safe_characters = string.ascii_uppercase + string.digits
377
def alternative_filename(filename, attempt=None):
378
    '''
379
    Generates an alternative version of given filename.
380
381
    If an number attempt parameter is given, will be used on the alternative
382
    name, a random value will be used otherwise.
383
384
    :param filename: original filename
385
    :param attempt: optional attempt number, defaults to null
386
    :return: new filename
387
    :rtype: str or unicode
388
    '''
389
    filename_parts = filename.rsplit('.', 2)
390
    name = filename_parts[0]
391
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
392
    if attempt is None:
393
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
394
    else:
395
        extra = ' (%d)' % attempt
396
    return '%s%s%s' % (name, extra, ext)
397