Completed
Branch dev-4.1 (fc3790)
by Felipe A.
01:10
created

File.listdir()   B

Complexity

Conditions 5

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 7
Bugs 0 Features 2
Metric Value
cc 5
c 7
b 0
f 2
dl 0
loc 27
rs 8.0894
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
import functools
16
import logging
17
import warnings
18
19
from flask import current_app, send_from_directory, Response
20
from werkzeug.utils import cached_property
21
22
from . import compat
23
from .compat import range
24
25
26
logger = logging.getLogger(__name__)
27
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
28
underscore_replace = '%s:underscore' % __name__
29
codecs.register_error(underscore_replace,
30
                      lambda error: (unicode_underscore, error.start + 1)
31
                      )
32
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
33
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
34
common_path_separators = '\\/'
35
restricted_chars = '\\/\0'
36
restricted_names = ('.', '..', '::', os.sep)
37
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
38
                   'LPT2', 'LPT3', 'PRN', 'NUL')
39
fs_safe_characters = string.ascii_uppercase + string.digits
40
41
42
class File(object):
43
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
44
    parent_class = None  # none means current class
45
    _listdir_cache = None
46
47
    def __init__(self, path=None, app=None, **defaults):
48
        self.path = compat.fsdecode(path) if path else None
49
        self.app = current_app if app is None else app
50
        self.__dict__.update(defaults)
51
52
    def remove(self):
53
        if not self.can_remove:
54
            raise OutsideRemovableBase("File outside removable base")
55
        if self.is_directory:
56
            shutil.rmtree(self.path)
57
        else:
58
            os.unlink(self.path)
59
60
    def download(self):
61
        if self.is_directory:
62
            stream = TarFileStream(
63
                self.path,
64
                self.app.config["directory_tar_buffsize"]
65
                )
66
            return Response(stream, mimetype="application/octet-stream")
67
        directory, name = os.path.split(self.path)
68
        return send_from_directory(directory, name, as_attachment=True)
69
70
    def contains(self, filename):
71
        return os.path.exists(os.path.join(self.path, filename))
72
73
    def choose_filename(self, filename, attempts=999):
74
        new_filename = filename
75
        for attempt in range(2, attempts+1):
76
            if not self.contains(new_filename):
77
                return new_filename
78
            new_filename = alternative_filename(filename, attempt)
79
        while self.contains(new_filename):
80
            new_filename = alternative_filename(filename)
81
        return new_filename
82
83
    @property
84
    def plugin_manager(self):
85
        return self.app.extensions['plugin_manager']
86
87
    @property
88
    def default_action(self):
89
        for action in self.actions:
90
            if action.widget.place == 'link':
91
                return action
92
        endpoint = 'browse' if self.is_directory else 'open'
93
        widget = self.plugin_manager.link_class.from_file(self)
94
        return self.plugin_manager.action_class(endpoint, widget)
95
96
    @cached_property
97
    def actions(self):
98
        return self.plugin_manager.get_actions(self)
99
100
    @cached_property
101
    def can_download(self):
102
        return self.app.config['directory_downloadable'] or \
103
               not self.is_directory
104
105
    @cached_property
106
    def can_remove(self):
107
        dirbase = self.app.config["directory_remove"]
108
        if dirbase:
109
            return self.path.startswith(dirbase + os.sep)
110
        return False
111
112
    @cached_property
113
    def can_upload(self):
114
        dirbase = self.app.config["directory_upload"]
115
        if self.is_directory and dirbase:
116
            return dirbase == self.path or \
117
                   self.path.startswith(dirbase + os.sep)
118
        return False
119
120
    @cached_property
121
    def stats(self):
122
        return os.stat(self.path)
123
124
    @cached_property
125
    def mimetype(self):
126
        if self.is_directory:
127
            return 'inode/directory'
128
        return self.plugin_manager.get_mimetype(self.path)
129
130
    @cached_property
131
    def is_directory(self):
132
        return os.path.isdir(self.path)
133
134
    @cached_property
135
    def is_file(self):
136
        return os.path.isfile(self.path)
137
138
    @cached_property
139
    def is_empty(self):
140
        if self._listdir_cache is not None:
141
            return bool(self._listdir_cache)
142
        for entry in compat.scandir(self.path):
143
            return False
144
        return True
145
146
    @cached_property
147
    def parent(self):
148
        if self.path == self.app.config['directory_base']:
149
            return None
150
        parent_class = self.parent_class or self.__class__
151
        return parent_class(os.path.dirname(self.path), self.app)
152
153
    @cached_property
154
    def ancestors(self):
155
        ancestors = []
156
        parent = self.parent
157
        while parent:
158
            ancestors.append(parent)
159
            parent = parent.parent
160
        return ancestors
161
162
    @cached_property
163
    def raw_listdir(self):
164
        warnings.warn(
165
            "Deprecated property File.raw_listdir",
166
            DeprecationWarning
167
        )
168
        return os.listdir(self.path)
169
170
    @property
171
    def modified(self):
172
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
173
        return dt.strftime('%Y.%m.%d %H:%M:%S')
174
175
    @property
176
    def size(self):
177
        size, unit = fmt_size(
178
            self.stats.st_size,
179
            self.app.config["use_binary_multiples"]
180
            )
181
        if unit == binary_units[0]:
182
            return "%d %s" % (size, unit)
183
        return "%.2f %s" % (size, unit)
184
185
    @property
186
    def urlpath(self):
187
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
188
189
    @property
190
    def name(self):
191
        return os.path.basename(self.path)
192
193
    @property
194
    def type(self):
195
        return self.mimetype.split(";", 1)[0]
196
197
    @property
198
    def encoding(self):
199
        if ";" in self.mimetype:
200
            match = self.re_charset.search(self.mimetype)
201
            gdict = match.groupdict() if match else {}
202
            return gdict.get("charset") or "default"
203
        return "default"
204
205
    def listdir(self):
206
        '''
207
        Get sorted list (by `is_directory` and `name` properties) of File
208
        objects.
209
210
        :return: sorted list of File instances
211
        :rtype: list of File
212
        '''
213
        if self._listdir_cache is not None:
214
            return self._listdir_cache
215
216
        precomputed_stats = os.name == 'nt'
217
        content = [
218
            self.__class__(
219
                path=entry.path,
220
                app=self.app,
221
                is_directory=entry.is_dir(),
222
                parent=self,
223
                **({'stats': entry.stat()}
224
                   if precomputed_stats and not entry.is_symlink() else
225
                   {})
226
                )
227
            for entry in compat.scandir(self.path)
228
            ]
229
        content.sort(key=lambda f: (not f.is_directory, f.name.lower()))
230
        self._listdir_cache = content
231
        return content
232
233
    @classmethod
234
    def from_urlpath(cls, path, app=None):
235
        '''
236
        Alternative constructor which accepts a path as taken from URL and uses
237
        the given app or the current app config to get the real path.
238
239
        :param path: relative path as from URL
240
        :param app: optional, flask application
241
        :return: file object pointing to path
242
        :rtype: File
243
        '''
244
        app = app or current_app
245
        base = app.config['directory_base']
246
        return cls(path=urlpath_to_abspath(path, base), app=app)
247
248
249
class TarFileStream(object):
250
    '''
251
    Tarfile which compresses while reading for streaming.
252
253
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
254
    compression.
255
    '''
256
    event_class = threading.Event
257
    thread_class = threading.Thread
258
    tarfile_class = tarfile.open
259
260
    def __init__(self, path, buffsize=10240):
261
        self.path = path
262
        self.name = os.path.basename(path) + ".tgz"
263
264
        self._finished = 0
265
        self._want = 0
266
        self._data = bytes()
267
        self._add = self.event_class()
268
        self._result = self.event_class()
269
        self._tarfile = self.tarfile_class(
270
            fileobj=self,
271
            mode="w|gz",
272
            bufsize=buffsize
273
            )  # stream write
274
        self._th = self.thread_class(target=self.fill)
275
        self._th.start()
276
277
    def fill(self):
278
        self._tarfile.add(self.path, "")
279
        self._tarfile.close()  # force stream flush
280
        self._finished += 1
281
        if not self._result.is_set():
282
            self._result.set()
283
284
    def write(self, data):
285
        self._add.wait()
286
        self._data += data
287
        if len(self._data) > self._want:
288
            self._add.clear()
289
            self._result.set()
290
        return len(data)
291
292
    def read(self, want=0):
293
        if self._finished:
294
            if self._finished == 1:
295
                self._finished += 1
296
                return ""
297
            return EOFError("EOF reached")
298
299
        # Thread communication
300
        self._want = want
301
        self._add.set()
302
        self._result.wait()
303
        self._result.clear()
304
305
        if want:
306
            data = self._data[:want]
307
            self._data = self._data[want:]
308
        else:
309
            data = self._data
310
            self._data = bytes()
311
        return data
312
313
    def __iter__(self):
314
        data = self.read()
315
        while data:
316
            yield data
317
            data = self.read()
318
319
320
class OutsideDirectoryBase(Exception):
321
    pass
322
323
324
class OutsideRemovableBase(Exception):
325
    pass
326
327
328
def fmt_size(size, binary=True):
329
    '''
330
    Get size and unit.
331
332
    :param size: size in bytes
333
    :param binary: whether use binary or standard units, defaults to True
334
    :return: size and unit
335
    :rtype: tuple of int and unit as str
336
    '''
337
    if binary:
338
        fmt_sizes = binary_units
339
        fmt_divider = 1024.
340
    else:
341
        fmt_sizes = standard_units
342
        fmt_divider = 1000.
343
    for fmt in fmt_sizes[:-1]:
344
        if size < 1000:
345
            return (size, fmt)
346
        size /= fmt_divider
347
    return size, fmt_sizes[-1]
348
349
350
def relativize_path(path, base, os_sep=os.sep):
351
    '''
352
    Make absolute path relative to an absolute base.
353
354
    :param path: absolute path
355
    :param base: absolute base path
356
    :param os_sep: path component separator, defaults to current OS separator
357
    :return: relative path
358
    :rtype: str or unicode
359
    :raises OutsideDirectoryBase: if path is not below base
360
    '''
361
    if not check_under_base(path, base, os_sep):
362
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
363
    prefix_len = len(base)
364
    if not base.endswith(os_sep):
365
        prefix_len += len(os_sep)
366
    return path[prefix_len:]
367
368
369
def abspath_to_urlpath(path, base, os_sep=os.sep):
370
    '''
371
    Make filesystem absolute path uri relative using given absolute base path.
372
373
    :param path: absolute path
374
    :param base: absolute base path
375
    :param os_sep: path component separator, defaults to current OS separator
376
    :return: relative uri
377
    :rtype: str or unicode
378
    :raises OutsideDirectoryBase: if resulting path is not below base
379
    '''
380
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
381
382
383
def urlpath_to_abspath(path, base, os_sep=os.sep):
384
    '''
385
    Make uri relative path fs absolute using a given absolute base path.
386
387
    :param path: relative path
388
    :param base: absolute base path
389
    :param os_sep: path component separator, defaults to current OS separator
390
    :return: absolute path
391
    :rtype: str or unicode
392
    :raises OutsideDirectoryBase: if resulting path is not below base
393
    '''
394
    prefix = base if base.endswith(os_sep) else base + os_sep
395
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
396
    if base == realpath or realpath.startswith(prefix):
397
        return realpath
398
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
399
400
401
def generic_filename(path):
402
    '''
403
    Extract filename of given path os-indepently, taking care of known path
404
    separators.
405
406
    :param path: path
407
    :return: filename
408
    :rtype: str or unicode (depending on given path)
409
    '''
410
411
    for sep in common_path_separators:
412
        if sep in path:
413
            _, path = path.rsplit(sep, 1)
414
    return path
415
416
417
def clean_restricted_chars(path, restricted_chars=restricted_chars):
418
    '''
419
    Get path without restricted characters.
420
421
    :param path: path
422
    :return: path without restricted characters
423
    :rtype: str or unicode (depending on given path)
424
    '''
425
    for character in restricted_chars:
426
        path = path.replace(character, '_')
427
    return path
428
429
430
def check_forbidden_filename(filename,
431
                             destiny_os=os.name,
432
                             restricted_names=restricted_names):
433
    '''
434
    Get if given filename is forbidden for current OS or filesystem.
435
436
    :param filename:
437
    :param destiny_os: destination operative system
438
    :param fs_encoding: destination filesystem filename encoding
439
    :return: wether is forbidden on given OS (or filesystem) or not
440
    :rtype: bool
441
    '''
442
    if destiny_os == 'nt':
443
        fpc = filename.split('.', 1)[0].upper()
444
        if fpc in nt_device_names:
445
            return True
446
447
    return filename in restricted_names
448
449
450
def check_under_base(path, base, os_sep=os.sep):
451
    '''
452
    Check if given absolute path is under given base.
453
454
    :param path: absolute path
455
    :param base: absolute base path
456
    :return: wether file is under given base or not
457
    :rtype: bool
458
    '''
459
    prefix = base if base.endswith(os_sep) else base + os_sep
460
    return path == base or path.startswith(prefix)
461
462
463
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
464
    '''
465
    Get rid of parent path components and special filenames.
466
467
    If path is invalid or protected, return empty string.
468
469
    :param path: unsafe path
470
    :type: str
471
    :param destiny_os: destination operative system
472
    :type destiny_os: str
473
    :return: filename or empty string
474
    :rtype: str
475
    '''
476
    path = generic_filename(path)
477
    path = clean_restricted_chars(path)
478
479
    if check_forbidden_filename(path, destiny_os=destiny_os):
480
        return ''
481
482
    if isinstance(path, bytes):
483
        path = path.decode('latin-1', errors=underscore_replace)
484
485
    # Decode and recover from filesystem encoding in order to strip unwanted
486
    # characters out
487
    kwargs = dict(
488
        os_name=destiny_os,
489
        fs_encoding=fs_encoding,
490
        errors=underscore_replace
491
        )
492
    fs_encoded_path = compat.fsencode(path, **kwargs)
493
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
494
    return fs_decoded_path
495
496
497
def alternative_filename(filename, attempt=None):
498
    '''
499
    Generates an alternative version of given filename.
500
501
    If an number attempt parameter is given, will be used on the alternative
502
    name, a random value will be used otherwise.
503
504
    :param filename: original filename
505
    :param attempt: optional attempt number, defaults to null
506
    :return: new filename
507
    :rtype: str or unicode
508
    '''
509
    filename_parts = filename.rsplit(u'.', 2)
510
    name = filename_parts[0]
511
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
512
    if attempt is None:
513
        choose = random.choice
514
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
515
    else:
516
        extra = u' (%d)' % attempt
517
    return u'%s%s%s' % (name, extra, ext)
518