Completed
Push — dev-4.1-unstable ( 136355...f650fd )
by Felipe A.
01:01
created

Node.link()   A

Complexity

Conditions 3

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @cached_property
52
    def widgets(self):
53
        widgets = self.plugin_manager.get_widgets(file=self)
54
        if self.can_remove:
55
            action = self.plugin_manager.widget_class(
56
                'entry-actions',
57
                'button',
58
                css='remove',
59
                endpoint='remove'
60
                )
61
            widgets.append(action)
62
        return widgets
63
64
    @cached_property
65
    def link(self):
66
        for widget in self.widgets:
67
            if widget.place == 'entry-link':
68
                return widget
69
70
    @cached_property
71
    def can_remove(self):
72
        dirbase = self.app.config["directory_remove"]
73
        return dirbase and self.path.startswith(dirbase + os.sep)
74
75
    @cached_property
76
    def stats(self):
77
        return os.stat(self.path)
78
79
    @cached_property
80
    def parent(self):
81
        if self.path == self.app.config['directory_base']:
82
            return None
83
        parent = os.path.dirname(self.path) if self.path else None
84
        return self.directory_class(parent, self.app) if parent else None
85
86
    @cached_property
87
    def ancestors(self):
88
        ancestors = []
89
        parent = self.parent
90
        while parent:
91
            ancestors.append(parent)
92
            parent = parent.parent
93
        return ancestors
94
95
    @property
96
    def modified(self):
97
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
98
        return dt.strftime('%Y.%m.%d %H:%M:%S')
99
100
    @property
101
    def urlpath(self):
102
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
103
104
    @property
105
    def name(self):
106
        return os.path.basename(self.path)
107
108
    @property
109
    def type(self):
110
        return self.mimetype.split(";", 1)[0]
111
112
    def __init__(self, path=None, app=None, **defaults):
113
        self.path = compat.fsdecode(path) if path else None
114
        self.app = current_app if app is None else app
115
        self.__dict__.update(defaults)
116
117
    def remove(self):
118
        if not self.can_remove:
119
            raise OutsideRemovableBase("File outside removable base")
120
121
    @classmethod
122
    def from_urlpath(cls, path, app=None):
123
        '''
124
        Alternative constructor which accepts a path as taken from URL and uses
125
        the given app or the current app config to get the real path.
126
127
        If class has attribute `generic` set to True, `directory_class` or
128
        `file_class` will be used as type.
129
130
        :param path: relative path as from URL
131
        :param app: optional, flask application
132
        :return: file object pointing to path
133
        :rtype: File
134
        '''
135
        app = app or current_app
136
        base = app.config['directory_base']
137
        path = urlpath_to_abspath(path, base)
138
        if not cls.generic:
139
            kls = cls
140
        elif os.path.isdir(path):
141
            kls = cls.directory_class
142
        else:
143
            kls = cls.file_class
144
        return kls(path=path, app=app)
145
146
    @classmethod
147
    def register_file_class(cls, kls):
148
        cls.file_class = kls
149
        return kls
150
151
    @classmethod
152
    def register_directory_class(cls, kls):
153
        cls.directory_class = kls
154
        return kls
155
156
157
@Node.register_file_class
158
class File(Node):
159
    can_download = True
160
    can_upload = False
161
    is_directory = False
162
    generic = False
163
164
    @cached_property
165
    def widgets(self):
166
        widgets = super(File, self).widgets
167
        if self.can_download:
168
            action = self.plugin_manager.widget_class(
169
                'entry-actions',
170
                'button',
171
                css='download',
172
                endpoint='download_file'
173
                )
174
            widgets.append(action)
175
        return widgets
176
177
    @property
178
    def link(self):
179
        widget = super(File, self).link
180
        if widget is None:
181
            return self.plugin_manager.widget_class(
182
                'entry-link',
183
                'link',
184
                text=self.name,
185
                icon='file-icon',
186
                endpoint='open'
187
                )
188
        return widget
189
190
    @cached_property
191
    def mimetype(self):
192
        return self.plugin_manager.get_mimetype(self.path)
193
194
    @cached_property
195
    def is_file(self):
196
        return os.path.isfile(self.path)
197
198
    @property
199
    def size(self):
200
        size, unit = fmt_size(
201
            self.stats.st_size,
202
            self.app.config["use_binary_multiples"]
203
            )
204
        if unit == binary_units[0]:
205
            return "%d %s" % (size, unit)
206
        return "%.2f %s" % (size, unit)
207
208
    @property
209
    def encoding(self):
210
        if ";" in self.mimetype:
211
            match = self.re_charset.search(self.mimetype)
212
            gdict = match.groupdict() if match else {}
213
            return gdict.get("charset") or "default"
214
        return "default"
215
216
    def remove(self):
217
        '''
218
        Remove file.
219
        :raises OutsideRemovableBase: when not under removable base directory
220
        '''
221
        super(File, self).remove()
222
        os.unlink(self.path)
223
224
    def download(self):
225
        '''
226
        Get a Flask's send_file Response object pointing to this file.
227
228
        :returns: Response object as returned by flask's send_file
229
        :rtype: flask.Response
230
        '''
231
        directory, name = os.path.split(self.path)
232
        return send_from_directory(directory, name, as_attachment=True)
233
234
235
@Node.register_directory_class
236
class Directory(Node):
237
    _listdir_cache = None
238
    mimetype = 'inode/directory'
239
    is_file = False
240
    size = 0
241
    encoding = 'default'
242
    generic = False
243
244
    @cached_property
245
    def widgets(self):
246
        widgets = super(File, self).widgets
247
        if self.can_download:
248
            action = self.plugin_manager.widget_class(
249
                'entry-actions',
250
                'button',
251
                css='download',
252
                endpoint='download_directory'
253
                )
254
            action.append(action)
255
        return widgets
256
257
    @property
258
    def link(self):
259
        widget = super(Directory, self).link
260
        if widget is None:
261
            return self.plugin_manager.widget_class(
262
                'entry-link',
263
                'link',
264
                text=self.name,
265
                icon='dir-icon',
266
                endpoint='browse'
267
                )
268
        return widget
269
270
    @cached_property
271
    def is_directory(self):
272
        return os.path.isdir(self.path)
273
274
    @cached_property
275
    def can_download(self):
276
        return self.app.config['directory_downloadable']
277
278
    @cached_property
279
    def can_upload(self):
280
        dirbase = self.app.config["directory_upload"]
281
        return dirbase and (
282
            dirbase == self.path or
283
            self.path.startswith(dirbase + os.sep)
284
            )
285
286
    @cached_property
287
    def is_empty(self):
288
        if self._listdir_cache is not None:
289
            return bool(self._listdir_cache)
290
        for entry in self._listdir():
291
            return False
292
        return True
293
294
    def remove(self):
295
        '''
296
        Remove directory tree.
297
298
        :raises OutsideRemovableBase: when not under removable base directory
299
        '''
300
        super(Directory, self).remove()
301
        shutil.rmtree(self.path)
302
303
    def download(self):
304
        '''
305
        Get a Flask Response object streaming a tarball of this directory.
306
307
        :returns: Response object
308
        :rtype: flask.Response
309
        '''
310
        return self.app.response_class(
311
            TarFileStream(
312
                self.path,
313
                self.app.config["directory_tar_buffsize"]
314
                ),
315
            mimetype="application/octet-stream"
316
            )
317
318
    def contains(self, filename):
319
        '''
320
        Check if directory contains an entry with given filename.
321
322
        :param filename: filename will be check
323
        :type filename: str
324
        :returns: True if exists, False otherwise.
325
        :rtype: bool
326
        '''
327
        return os.path.exists(os.path.join(self.path, filename))
328
329
    def choose_filename(self, filename, attempts=999):
330
        '''
331
        Get a new filename which does not colide with any entry on directory,
332
        based on given filename.
333
334
        :param filename: base filename
335
        :type filename: str
336
        :param attempts: number of attempts, defaults to 999
337
        :type attempts: int
338
        :returns: filename
339
        :rtype: str
340
        '''
341
        new_filename = filename
342
        for attempt in range(2, attempts + 1):
343
            if not self.contains(new_filename):
344
                return new_filename
345
            new_filename = alternative_filename(filename, attempt)
346
        while self.contains(new_filename):
347
            new_filename = alternative_filename(filename)
348
        return new_filename
349
350
    def _listdir(self):
351
        '''
352
        Iter unsorted entries on this directory.
353
354
        :yields: Directory or File instance for each entry in directory
355
        :ytype: Node
356
        '''
357
        precomputed_stats = os.name == 'nt'
358
        for entry in compat.scandir(self.path):
359
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
360
            if precomputed_stats and not entry.is_symlink():
361
                kwargs['stats'] = entry.stats()
362
            if entry.is_dir(follow_symlinks=True):
363
                yield self.directory_class(**kwargs)
364
                continue
365
            yield self.file_class(**kwargs)
366
367
    def listdir(self, sortkey=None, reverse=False):
368
        '''
369
        Get sorted list (by `is_directory` and `name` properties) of File
370
        objects.
371
372
        :return: sorted list of File instances
373
        :rtype: list of File
374
        '''
375
        if self._listdir_cache is None:
376
            if sortkey:
377
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
378
            elif reverse:
379
                data = list(reversed(self._listdir()))
380
            else:
381
                data = list(self._listdir())
382
            self._listdir_cache = data
383
        return self._listdir_cache
384
385
386
class TarFileStream(object):
387
    '''
388
    Tarfile which compresses while reading for streaming.
389
390
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
391
    compression.
392
    '''
393
    event_class = threading.Event
394
    thread_class = threading.Thread
395
    tarfile_class = tarfile.open
396
397
    def __init__(self, path, buffsize=10240):
398
        self.path = path
399
        self.name = os.path.basename(path) + ".tgz"
400
401
        self._finished = 0
402
        self._want = 0
403
        self._data = bytes()
404
        self._add = self.event_class()
405
        self._result = self.event_class()
406
        self._tarfile = self.tarfile_class(  # stream write
407
            fileobj=self,
408
            mode="w|gz",
409
            bufsize=buffsize
410
            )
411
        self._th = self.thread_class(target=self.fill)
412
        self._th.start()
413
414
    def fill(self):
415
        self._tarfile.add(self.path, "")
416
        self._tarfile.close()  # force stream flush
417
        self._finished += 1
418
        if not self._result.is_set():
419
            self._result.set()
420
421
    def write(self, data):
422
        self._add.wait()
423
        self._data += data
424
        if len(self._data) > self._want:
425
            self._add.clear()
426
            self._result.set()
427
        return len(data)
428
429
    def read(self, want=0):
430
        if self._finished:
431
            if self._finished == 1:
432
                self._finished += 1
433
                return ""
434
            return EOFError("EOF reached")
435
436
        # Thread communication
437
        self._want = want
438
        self._add.set()
439
        self._result.wait()
440
        self._result.clear()
441
442
        if want:
443
            data = self._data[:want]
444
            self._data = self._data[want:]
445
        else:
446
            data = self._data
447
            self._data = bytes()
448
        return data
449
450
    def __iter__(self):
451
        data = self.read()
452
        while data:
453
            yield data
454
            data = self.read()
455
456
457
class OutsideDirectoryBase(Exception):
458
    pass
459
460
461
class OutsideRemovableBase(Exception):
462
    pass
463
464
465
def fmt_size(size, binary=True):
466
    '''
467
    Get size and unit.
468
469
    :param size: size in bytes
470
    :param binary: whether use binary or standard units, defaults to True
471
    :return: size and unit
472
    :rtype: tuple of int and unit as str
473
    '''
474
    if binary:
475
        fmt_sizes = binary_units
476
        fmt_divider = 1024.
477
    else:
478
        fmt_sizes = standard_units
479
        fmt_divider = 1000.
480
    for fmt in fmt_sizes[:-1]:
481
        if size < 1000:
482
            return (size, fmt)
483
        size /= fmt_divider
484
    return size, fmt_sizes[-1]
485
486
487
def relativize_path(path, base, os_sep=os.sep):
488
    '''
489
    Make absolute path relative to an absolute base.
490
491
    :param path: absolute path
492
    :param base: absolute base path
493
    :param os_sep: path component separator, defaults to current OS separator
494
    :return: relative path
495
    :rtype: str or unicode
496
    :raises OutsideDirectoryBase: if path is not below base
497
    '''
498
    if not check_under_base(path, base, os_sep):
499
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
500
    prefix_len = len(base)
501
    if not base.endswith(os_sep):
502
        prefix_len += len(os_sep)
503
    return path[prefix_len:]
504
505
506
def abspath_to_urlpath(path, base, os_sep=os.sep):
507
    '''
508
    Make filesystem absolute path uri relative using given absolute base path.
509
510
    :param path: absolute path
511
    :param base: absolute base path
512
    :param os_sep: path component separator, defaults to current OS separator
513
    :return: relative uri
514
    :rtype: str or unicode
515
    :raises OutsideDirectoryBase: if resulting path is not below base
516
    '''
517
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
518
519
520
def urlpath_to_abspath(path, base, os_sep=os.sep):
521
    '''
522
    Make uri relative path fs absolute using a given absolute base path.
523
524
    :param path: relative path
525
    :param base: absolute base path
526
    :param os_sep: path component separator, defaults to current OS separator
527
    :return: absolute path
528
    :rtype: str or unicode
529
    :raises OutsideDirectoryBase: if resulting path is not below base
530
    '''
531
    prefix = base if base.endswith(os_sep) else base + os_sep
532
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
533
    if base == realpath or realpath.startswith(prefix):
534
        return realpath
535
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
536
537
538
def generic_filename(path):
539
    '''
540
    Extract filename of given path os-indepently, taking care of known path
541
    separators.
542
543
    :param path: path
544
    :return: filename
545
    :rtype: str or unicode (depending on given path)
546
    '''
547
548
    for sep in common_path_separators:
549
        if sep in path:
550
            _, path = path.rsplit(sep, 1)
551
    return path
552
553
554
def clean_restricted_chars(path, restricted_chars=restricted_chars):
555
    '''
556
    Get path without restricted characters.
557
558
    :param path: path
559
    :return: path without restricted characters
560
    :rtype: str or unicode (depending on given path)
561
    '''
562
    for character in restricted_chars:
563
        path = path.replace(character, '_')
564
    return path
565
566
567
def check_forbidden_filename(filename,
568
                             destiny_os=os.name,
569
                             restricted_names=restricted_names):
570
    '''
571
    Get if given filename is forbidden for current OS or filesystem.
572
573
    :param filename:
574
    :param destiny_os: destination operative system
575
    :param fs_encoding: destination filesystem filename encoding
576
    :return: wether is forbidden on given OS (or filesystem) or not
577
    :rtype: bool
578
    '''
579
    if destiny_os == 'nt':
580
        fpc = filename.split('.', 1)[0].upper()
581
        if fpc in nt_device_names:
582
            return True
583
584
    return filename in restricted_names
585
586
587
def check_under_base(path, base, os_sep=os.sep):
588
    '''
589
    Check if given absolute path is under given base.
590
591
    :param path: absolute path
592
    :param base: absolute base path
593
    :return: wether file is under given base or not
594
    :rtype: bool
595
    '''
596
    prefix = base if base.endswith(os_sep) else base + os_sep
597
    return path == base or path.startswith(prefix)
598
599
600
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
601
    '''
602
    Get rid of parent path components and special filenames.
603
604
    If path is invalid or protected, return empty string.
605
606
    :param path: unsafe path
607
    :type: str
608
    :param destiny_os: destination operative system
609
    :type destiny_os: str
610
    :return: filename or empty string
611
    :rtype: str
612
    '''
613
    path = generic_filename(path)
614
    path = clean_restricted_chars(path)
615
616
    if check_forbidden_filename(path, destiny_os=destiny_os):
617
        return ''
618
619
    if isinstance(path, bytes):
620
        path = path.decode('latin-1', errors=underscore_replace)
621
622
    # Decode and recover from filesystem encoding in order to strip unwanted
623
    # characters out
624
    kwargs = dict(
625
        os_name=destiny_os,
626
        fs_encoding=fs_encoding,
627
        errors=underscore_replace
628
        )
629
    fs_encoded_path = compat.fsencode(path, **kwargs)
630
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
631
    return fs_decoded_path
632
633
634
def alternative_filename(filename, attempt=None):
635
    '''
636
    Generates an alternative version of given filename.
637
638
    If an number attempt parameter is given, will be used on the alternative
639
    name, a random value will be used otherwise.
640
641
    :param filename: original filename
642
    :param attempt: optional attempt number, defaults to null
643
    :return: new filename
644
    :rtype: str or unicode
645
    '''
646
    filename_parts = filename.rsplit(u'.', 2)
647
    name = filename_parts[0]
648
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
649
    if attempt is None:
650
        choose = random.choice
651
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
652
    else:
653
        extra = u' (%d)' % attempt
654
    return u'%s%s%s' % (name, extra, ext)
655