Completed
Push — dev-4.1 ( c2f7a9...db8c57 )
by Felipe A.
01:12
created

Directory.download()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 6
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
import functools
16
import logging
17
import warnings
18
19
from flask import current_app, send_from_directory, Response
20
from werkzeug.utils import cached_property
21
22
from . import compat
23
from .compat import range
24
from .functional import psetattr
25
26
27
logger = logging.getLogger(__name__)
28
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
29
underscore_replace = '%s:underscore' % __name__
30
codecs.register_error(underscore_replace,
31
                      lambda error: (unicode_underscore, error.start + 1)
32
                      )
33
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
34
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
35
common_path_separators = '\\/'
36
restricted_chars = '\\/\0'
37
restricted_names = ('.', '..', '::', os.sep)
38
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
39
                   'LPT2', 'LPT3', 'PRN', 'NUL')
40
fs_safe_characters = string.ascii_uppercase + string.digits
41
42
43
class Node(object):
44
    directory_class = None  # set later at import time
45
    file_class = None  # set later at import time
46
47
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
48
    can_download = False
49
50
    @property
51
    def plugin_manager(self):
52
        return self.app.extensions['plugin_manager']
53
54
    @property
55
    def default_action(self):
56
        for action in self.actions:
57
            if action.widget.place == 'link':
58
                return action
59
60
    @cached_property
61
    def actions(self):
62
        return self.plugin_manager.get_actions(self)
63
64
    @cached_property
65
    def can_remove(self):
66
        dirbase = self.app.config["directory_remove"]
67
        return dirbase and self.path.startswith(dirbase + os.sep)
68
69
    @cached_property
70
    def stats(self):
71
        return os.stat(self.path)
72
73
    @cached_property
74
    def parent(self):
75
        if self.path == self.app.config['directory_base']:
76
            return None
77
        return self.directory_class(os.path.dirname(self.path), self.app)
78
79
    @cached_property
80
    def ancestors(self):
81
        ancestors = []
82
        parent = self.parent
83
        while parent:
84
            ancestors.append(parent)
85
            parent = parent.parent
86
        return ancestors
87
88
    @property
89
    def modified(self):
90
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
91
        return dt.strftime('%Y.%m.%d %H:%M:%S')
92
93
    @property
94
    def urlpath(self):
95
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
96
97
    @property
98
    def name(self):
99
        return os.path.basename(self.path)
100
101
    @property
102
    def type(self):
103
        return self.mimetype.split(";", 1)[0]
104
105
    def __init__(self, path=None, app=None, **defaults):
106
        self.path = compat.fsdecode(path) if path else None
107
        self.app = current_app if app is None else app
108
        self.__dict__.update(defaults)
109
110
    def remove(self):
111
        if not self.can_remove:
112
            raise OutsideRemovableBase("File outside removable base")
113
114
    @classmethod
115
    def from_urlpath(cls, path, app=None):
116
        '''
117
        Alternative constructor which accepts a path as taken from URL and uses
118
        the given app or the current app config to get the real path.
119
120
        :param path: relative path as from URL
121
        :param app: optional, flask application
122
        :return: file object pointing to path
123
        :rtype: File
124
        '''
125
        app = app or current_app
126
        base = app.config['directory_base']
127
        path = urlpath_to_abspath(path, base)
128
        kls = cls.directory_class if os.path.isdir(path) else cls.file_class
129
        return kls(path=path, app=app)
130
131
132
@psetattr(Node, 'file_class')
133
class File(Node):
134
    can_download = True
135
    can_upload = False
136
    is_directory = False
137
138
    @property
139
    def default_action(self):
140
        action = super(File, self).default_action
141
        if action is None:
142
            widget = self.plugin_manager.link_class.from_file(self)
143
            action = self.plugin_manager.action_class('open', widget)
144
        return action
145
146
    @cached_property
147
    def mimetype(self):
148
        return self.plugin_manager.get_mimetype(self.path)
149
150
    @cached_property
151
    def is_file(self):
152
        return os.path.isfile(self.path)
153
154
    @property
155
    def size(self):
156
        size, unit = fmt_size(
157
            self.stats.st_size,
158
            self.app.config["use_binary_multiples"]
159
            )
160
        if unit == binary_units[0]:
161
            return "%d %s" % (size, unit)
162
        return "%.2f %s" % (size, unit)
163
164
    @property
165
    def encoding(self):
166
        if ";" in self.mimetype:
167
            match = self.re_charset.search(self.mimetype)
168
            gdict = match.groupdict() if match else {}
169
            return gdict.get("charset") or "default"
170
        return "default"
171
172
    def remove(self):
173
        super(File, self).remove()
174
        os.unlink(self.path)
175
176
    def download(self):
177
        directory, name = os.path.split(self.path)
178
        return send_from_directory(directory, name, as_attachment=True)
179
180
181
@psetattr(Node, 'directory_class')
182
class Directory(Node):
183
    _listdir_cache = None
184
    mimetype = 'inode/directory'
185
    is_directory = True
186
    is_file = False
187
    size = 0
188
    encoding = 'default'
189
190
    @property
191
    def default_action(self):
192
        action = super(Directory, self).default_action
193
        if action is None:
194
            widget = self.plugin_manager.link_class.from_file(self)
195
            action = self.plugin_manager.action_class('browse', widget)
196
        return action
197
198
    @cached_property
199
    def can_download(self):
200
        return self.app.config['directory_downloadable']
201
202
    @cached_property
203
    def can_upload(self):
204
        dirbase = self.app.config["directory_upload"]
205
        return dirbase and (
206
            dirbase == self.path or
207
            self.path.startswith(dirbase + os.sep)
208
            )
209
210
    @cached_property
211
    def is_empty(self):
212
        if self._listdir_cache is not None:
213
            return bool(self._listdir_cache)
214
        for entry in compat.scandir(self.path):
215
            return False
216
        return True
217
218
    def remove(self):
219
        super(Directory, self).remove()
220
        shutil.rmtree(self.path)
221
222
    def download(self):
223
        stream = TarFileStream(
224
            self.path,
225
            self.app.config["directory_tar_buffsize"]
226
            )
227
        return Response(stream, mimetype="application/octet-stream")
228
229
    def contains(self, filename):
230
        return os.path.exists(os.path.join(self.path, filename))
231
232
    def choose_filename(self, filename, attempts=999):
233
        new_filename = filename
234
        for attempt in range(2, attempts+1):
235
            if not self.contains(new_filename):
236
                return new_filename
237
            new_filename = alternative_filename(filename, attempt)
238
        while self.contains(new_filename):
239
            new_filename = alternative_filename(filename)
240
        return new_filename
241
242
    def _listdir(self):
243
        '''
244
        Iter unsorted entries on this directory.
245
246
        :yields: Directory or File instance for each entry in directory
247
        :ytype: Node
248
        '''
249
        precomputed_stats = os.name == 'nt'
250
        for entry in compat.scandir(self.path):
251
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
252
            if precomputed_stats and not entry.is_symlink():
253
                kwargs['stats'] = entry.stats()
254
            if entry.is_dir(follow_symlinks=True):
255
                yield self.directory_class(**kwargs)
256
                continue
257
            yield self.file_class(**kwargs)
258
259
    def listdir(self):
260
        '''
261
        Get sorted list (by `is_directory` and `name` properties) of File
262
        objects.
263
264
        :return: sorted list of File instances
265
        :rtype: list of File
266
        '''
267
        if self._listdir_cache is None:
268
            self._listdir_cache = sorted(
269
                self._listdir(),
270
                key=lambda f: (not f.is_directory, f.name.lower())
271
                )
272
        return self._listdir_cache
273
274
275
class TarFileStream(object):
276
    '''
277
    Tarfile which compresses while reading for streaming.
278
279
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
280
    compression.
281
    '''
282
    event_class = threading.Event
283
    thread_class = threading.Thread
284
    tarfile_class = tarfile.open
285
286
    def __init__(self, path, buffsize=10240):
287
        self.path = path
288
        self.name = os.path.basename(path) + ".tgz"
289
290
        self._finished = 0
291
        self._want = 0
292
        self._data = bytes()
293
        self._add = self.event_class()
294
        self._result = self.event_class()
295
        self._tarfile = self.tarfile_class(
296
            fileobj=self,
297
            mode="w|gz",
298
            bufsize=buffsize
299
            )  # stream write
300
        self._th = self.thread_class(target=self.fill)
301
        self._th.start()
302
303
    def fill(self):
304
        self._tarfile.add(self.path, "")
305
        self._tarfile.close()  # force stream flush
306
        self._finished += 1
307
        if not self._result.is_set():
308
            self._result.set()
309
310
    def write(self, data):
311
        self._add.wait()
312
        self._data += data
313
        if len(self._data) > self._want:
314
            self._add.clear()
315
            self._result.set()
316
        return len(data)
317
318
    def read(self, want=0):
319
        if self._finished:
320
            if self._finished == 1:
321
                self._finished += 1
322
                return ""
323
            return EOFError("EOF reached")
324
325
        # Thread communication
326
        self._want = want
327
        self._add.set()
328
        self._result.wait()
329
        self._result.clear()
330
331
        if want:
332
            data = self._data[:want]
333
            self._data = self._data[want:]
334
        else:
335
            data = self._data
336
            self._data = bytes()
337
        return data
338
339
    def __iter__(self):
340
        data = self.read()
341
        while data:
342
            yield data
343
            data = self.read()
344
345
346
class OutsideDirectoryBase(Exception):
347
    pass
348
349
350
class OutsideRemovableBase(Exception):
351
    pass
352
353
354
def fmt_size(size, binary=True):
355
    '''
356
    Get size and unit.
357
358
    :param size: size in bytes
359
    :param binary: whether use binary or standard units, defaults to True
360
    :return: size and unit
361
    :rtype: tuple of int and unit as str
362
    '''
363
    if binary:
364
        fmt_sizes = binary_units
365
        fmt_divider = 1024.
366
    else:
367
        fmt_sizes = standard_units
368
        fmt_divider = 1000.
369
    for fmt in fmt_sizes[:-1]:
370
        if size < 1000:
371
            return (size, fmt)
372
        size /= fmt_divider
373
    return size, fmt_sizes[-1]
374
375
376
def relativize_path(path, base, os_sep=os.sep):
377
    '''
378
    Make absolute path relative to an absolute base.
379
380
    :param path: absolute path
381
    :param base: absolute base path
382
    :param os_sep: path component separator, defaults to current OS separator
383
    :return: relative path
384
    :rtype: str or unicode
385
    :raises OutsideDirectoryBase: if path is not below base
386
    '''
387
    if not check_under_base(path, base, os_sep):
388
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
389
    prefix_len = len(base)
390
    if not base.endswith(os_sep):
391
        prefix_len += len(os_sep)
392
    return path[prefix_len:]
393
394
395
def abspath_to_urlpath(path, base, os_sep=os.sep):
396
    '''
397
    Make filesystem absolute path uri relative using given absolute base path.
398
399
    :param path: absolute path
400
    :param base: absolute base path
401
    :param os_sep: path component separator, defaults to current OS separator
402
    :return: relative uri
403
    :rtype: str or unicode
404
    :raises OutsideDirectoryBase: if resulting path is not below base
405
    '''
406
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
407
408
409
def urlpath_to_abspath(path, base, os_sep=os.sep):
410
    '''
411
    Make uri relative path fs absolute using a given absolute base path.
412
413
    :param path: relative path
414
    :param base: absolute base path
415
    :param os_sep: path component separator, defaults to current OS separator
416
    :return: absolute path
417
    :rtype: str or unicode
418
    :raises OutsideDirectoryBase: if resulting path is not below base
419
    '''
420
    prefix = base if base.endswith(os_sep) else base + os_sep
421
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
422
    if base == realpath or realpath.startswith(prefix):
423
        return realpath
424
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
425
426
427
def generic_filename(path):
428
    '''
429
    Extract filename of given path os-indepently, taking care of known path
430
    separators.
431
432
    :param path: path
433
    :return: filename
434
    :rtype: str or unicode (depending on given path)
435
    '''
436
437
    for sep in common_path_separators:
438
        if sep in path:
439
            _, path = path.rsplit(sep, 1)
440
    return path
441
442
443
def clean_restricted_chars(path, restricted_chars=restricted_chars):
444
    '''
445
    Get path without restricted characters.
446
447
    :param path: path
448
    :return: path without restricted characters
449
    :rtype: str or unicode (depending on given path)
450
    '''
451
    for character in restricted_chars:
452
        path = path.replace(character, '_')
453
    return path
454
455
456
def check_forbidden_filename(filename,
457
                             destiny_os=os.name,
458
                             restricted_names=restricted_names):
459
    '''
460
    Get if given filename is forbidden for current OS or filesystem.
461
462
    :param filename:
463
    :param destiny_os: destination operative system
464
    :param fs_encoding: destination filesystem filename encoding
465
    :return: wether is forbidden on given OS (or filesystem) or not
466
    :rtype: bool
467
    '''
468
    if destiny_os == 'nt':
469
        fpc = filename.split('.', 1)[0].upper()
470
        if fpc in nt_device_names:
471
            return True
472
473
    return filename in restricted_names
474
475
476
def check_under_base(path, base, os_sep=os.sep):
477
    '''
478
    Check if given absolute path is under given base.
479
480
    :param path: absolute path
481
    :param base: absolute base path
482
    :return: wether file is under given base or not
483
    :rtype: bool
484
    '''
485
    prefix = base if base.endswith(os_sep) else base + os_sep
486
    return path == base or path.startswith(prefix)
487
488
489
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
490
    '''
491
    Get rid of parent path components and special filenames.
492
493
    If path is invalid or protected, return empty string.
494
495
    :param path: unsafe path
496
    :type: str
497
    :param destiny_os: destination operative system
498
    :type destiny_os: str
499
    :return: filename or empty string
500
    :rtype: str
501
    '''
502
    path = generic_filename(path)
503
    path = clean_restricted_chars(path)
504
505
    if check_forbidden_filename(path, destiny_os=destiny_os):
506
        return ''
507
508
    if isinstance(path, bytes):
509
        path = path.decode('latin-1', errors=underscore_replace)
510
511
    # Decode and recover from filesystem encoding in order to strip unwanted
512
    # characters out
513
    kwargs = dict(
514
        os_name=destiny_os,
515
        fs_encoding=fs_encoding,
516
        errors=underscore_replace
517
        )
518
    fs_encoded_path = compat.fsencode(path, **kwargs)
519
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
520
    return fs_decoded_path
521
522
523
def alternative_filename(filename, attempt=None):
524
    '''
525
    Generates an alternative version of given filename.
526
527
    If an number attempt parameter is given, will be used on the alternative
528
    name, a random value will be used otherwise.
529
530
    :param filename: original filename
531
    :param attempt: optional attempt number, defaults to null
532
    :return: new filename
533
    :rtype: str or unicode
534
    '''
535
    filename_parts = filename.rsplit(u'.', 2)
536
    name = filename_parts[0]
537
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
538
    if attempt is None:
539
        choose = random.choice
540
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
541
    else:
542
        extra = u' (%d)' % attempt
543
    return u'%s%s%s' % (name, extra, ext)
544