Completed
Push — dev-4.1-unstable ( f650fd...fbeeec )
by Felipe A.
56s
created

Node.widgets()   B

Complexity

Conditions 2

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 26
rs 8.8571
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @cached_property
52
    def widgets(self):
53
        widgets = self.plugin_manager.get_widgets(file=self)
54
        widgets.extend((
55
            self.plugin_manager.create_widget(
56
                'head',
57
                'script',
58
                endpoint='static',
59
                filename='browse.body.js'
60
                ),
61
            self.plugin_manager.create_widget(
62
                'scripts',
63
                'script',
64
                endpoint='static',
65
                filename='browse.body.js'
66
                )
67
        ))
68
        if self.can_remove:
69
            action = self.plugin_manager.create_widget(
70
                'entry-actions',
71
                'button',
72
                css='remove',
73
                endpoint='remove'
74
                )
75
            widgets.append(action)
76
        return widgets
77
78
    @cached_property
79
    def link(self):
80
        for widget in self.widgets:
81
            if widget.place == 'entry-link':
82
                return widget
83
84
    @cached_property
85
    def can_remove(self):
86
        dirbase = self.app.config["directory_remove"]
87
        return dirbase and self.path.startswith(dirbase + os.sep)
88
89
    @cached_property
90
    def stats(self):
91
        return os.stat(self.path)
92
93
    @cached_property
94
    def parent(self):
95
        if self.path == self.app.config['directory_base']:
96
            return None
97
        parent = os.path.dirname(self.path) if self.path else None
98
        return self.directory_class(parent, self.app) if parent else None
99
100
    @cached_property
101
    def ancestors(self):
102
        ancestors = []
103
        parent = self.parent
104
        while parent:
105
            ancestors.append(parent)
106
            parent = parent.parent
107
        return ancestors
108
109
    @property
110
    def modified(self):
111
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
112
        return dt.strftime('%Y.%m.%d %H:%M:%S')
113
114
    @property
115
    def urlpath(self):
116
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
117
118
    @property
119
    def name(self):
120
        return os.path.basename(self.path)
121
122
    @property
123
    def type(self):
124
        return self.mimetype.split(";", 1)[0]
125
126
    def __init__(self, path=None, app=None, **defaults):
127
        self.path = compat.fsdecode(path) if path else None
128
        self.app = current_app if app is None else app
129
        self.__dict__.update(defaults)
130
131
    def remove(self):
132
        if not self.can_remove:
133
            raise OutsideRemovableBase("File outside removable base")
134
135
    @classmethod
136
    def from_urlpath(cls, path, app=None):
137
        '''
138
        Alternative constructor which accepts a path as taken from URL and uses
139
        the given app or the current app config to get the real path.
140
141
        If class has attribute `generic` set to True, `directory_class` or
142
        `file_class` will be used as type.
143
144
        :param path: relative path as from URL
145
        :param app: optional, flask application
146
        :return: file object pointing to path
147
        :rtype: File
148
        '''
149
        app = app or current_app
150
        base = app.config['directory_base']
151
        path = urlpath_to_abspath(path, base)
152
        if not cls.generic:
153
            kls = cls
154
        elif os.path.isdir(path):
155
            kls = cls.directory_class
156
        else:
157
            kls = cls.file_class
158
        return kls(path=path, app=app)
159
160
    @classmethod
161
    def register_file_class(cls, kls):
162
        cls.file_class = kls
163
        return kls
164
165
    @classmethod
166
    def register_directory_class(cls, kls):
167
        cls.directory_class = kls
168
        return kls
169
170
171
@Node.register_file_class
172
class File(Node):
173
    can_download = True
174
    can_upload = False
175
    is_directory = False
176
    generic = False
177
178
    @cached_property
179
    def widgets(self):
180
        widgets = super(File, self).widgets
181
        if self.can_download:
182
            action = self.plugin_manager.create_widget(
183
                'entry-actions',
184
                'button',
185
                css='download',
186
                endpoint='download_file'
187
                )
188
            widgets.append(action)
189
        return widgets
190
191
    @property
192
    def link(self):
193
        widget = super(File, self).link
194
        if widget is None:
195
            return self.plugin_manager.create_widget(
196
                'entry-link',
197
                'link',
198
                text=self.name,
199
                icon='file-icon',
200
                endpoint='open'
201
                )
202
        return widget
203
204
    @cached_property
205
    def mimetype(self):
206
        return self.plugin_manager.get_mimetype(self.path)
207
208
    @cached_property
209
    def is_file(self):
210
        return os.path.isfile(self.path)
211
212
    @property
213
    def size(self):
214
        size, unit = fmt_size(
215
            self.stats.st_size,
216
            self.app.config["use_binary_multiples"]
217
            )
218
        if unit == binary_units[0]:
219
            return "%d %s" % (size, unit)
220
        return "%.2f %s" % (size, unit)
221
222
    @property
223
    def encoding(self):
224
        if ";" in self.mimetype:
225
            match = self.re_charset.search(self.mimetype)
226
            gdict = match.groupdict() if match else {}
227
            return gdict.get("charset") or "default"
228
        return "default"
229
230
    def remove(self):
231
        '''
232
        Remove file.
233
        :raises OutsideRemovableBase: when not under removable base directory
234
        '''
235
        super(File, self).remove()
236
        os.unlink(self.path)
237
238
    def download(self):
239
        '''
240
        Get a Flask's send_file Response object pointing to this file.
241
242
        :returns: Response object as returned by flask's send_file
243
        :rtype: flask.Response
244
        '''
245
        directory, name = os.path.split(self.path)
246
        return send_from_directory(directory, name, as_attachment=True)
247
248
249
@Node.register_directory_class
250
class Directory(Node):
251
    _listdir_cache = None
252
    mimetype = 'inode/directory'
253
    is_file = False
254
    size = 0
255
    encoding = 'default'
256
    generic = False
257
258
    @cached_property
259
    def widgets(self):
260
        widgets = super(Directory, self).widgets
261
        if self.can_upload:
262
            action = self.plugin_manager.create_widget(
263
                'header',
264
                'upload',
265
                text='Upload',
266
                endpoint='upload'
267
                )
268
            widgets.append(action)
269
        if self.can_download:
270
            action = self.plugin_manager.create_widget(
271
                'entry-actions',
272
                'button',
273
                css='download',
274
                endpoint='download_directory'
275
                )
276
            widgets.append(action)
277
        return widgets
278
279
    @property
280
    def link(self):
281
        widget = super(Directory, self).link
282
        if widget is None:
283
            return self.plugin_manager.create_widget(
284
                'entry-link',
285
                'link',
286
                text=self.name,
287
                icon='dir-icon',
288
                endpoint='browse'
289
                )
290
        return widget
291
292
    @cached_property
293
    def is_directory(self):
294
        return os.path.isdir(self.path)
295
296
    @cached_property
297
    def can_download(self):
298
        return self.app.config['directory_downloadable']
299
300
    @cached_property
301
    def can_upload(self):
302
        dirbase = self.app.config["directory_upload"]
303
        return dirbase and (
304
            dirbase == self.path or
305
            self.path.startswith(dirbase + os.sep)
306
            )
307
308
    @cached_property
309
    def is_empty(self):
310
        if self._listdir_cache is not None:
311
            return bool(self._listdir_cache)
312
        for entry in self._listdir():
313
            return False
314
        return True
315
316
    def remove(self):
317
        '''
318
        Remove directory tree.
319
320
        :raises OutsideRemovableBase: when not under removable base directory
321
        '''
322
        super(Directory, self).remove()
323
        shutil.rmtree(self.path)
324
325
    def download(self):
326
        '''
327
        Get a Flask Response object streaming a tarball of this directory.
328
329
        :returns: Response object
330
        :rtype: flask.Response
331
        '''
332
        return self.app.response_class(
333
            TarFileStream(
334
                self.path,
335
                self.app.config["directory_tar_buffsize"]
336
                ),
337
            mimetype="application/octet-stream"
338
            )
339
340
    def contains(self, filename):
341
        '''
342
        Check if directory contains an entry with given filename.
343
344
        :param filename: filename will be check
345
        :type filename: str
346
        :returns: True if exists, False otherwise.
347
        :rtype: bool
348
        '''
349
        return os.path.exists(os.path.join(self.path, filename))
350
351
    def choose_filename(self, filename, attempts=999):
352
        '''
353
        Get a new filename which does not colide with any entry on directory,
354
        based on given filename.
355
356
        :param filename: base filename
357
        :type filename: str
358
        :param attempts: number of attempts, defaults to 999
359
        :type attempts: int
360
        :returns: filename
361
        :rtype: str
362
        '''
363
        new_filename = filename
364
        for attempt in range(2, attempts + 1):
365
            if not self.contains(new_filename):
366
                return new_filename
367
            new_filename = alternative_filename(filename, attempt)
368
        while self.contains(new_filename):
369
            new_filename = alternative_filename(filename)
370
        return new_filename
371
372
    def _listdir(self):
373
        '''
374
        Iter unsorted entries on this directory.
375
376
        :yields: Directory or File instance for each entry in directory
377
        :ytype: Node
378
        '''
379
        precomputed_stats = os.name == 'nt'
380
        for entry in compat.scandir(self.path):
381
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
382
            if precomputed_stats and not entry.is_symlink():
383
                kwargs['stats'] = entry.stats()
384
            if entry.is_dir(follow_symlinks=True):
385
                yield self.directory_class(**kwargs)
386
                continue
387
            yield self.file_class(**kwargs)
388
389
    def listdir(self, sortkey=None, reverse=False):
390
        '''
391
        Get sorted list (by `is_directory` and `name` properties) of File
392
        objects.
393
394
        :return: sorted list of File instances
395
        :rtype: list of File
396
        '''
397
        if self._listdir_cache is None:
398
            if sortkey:
399
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
400
            elif reverse:
401
                data = list(reversed(self._listdir()))
402
            else:
403
                data = list(self._listdir())
404
            self._listdir_cache = data
405
        return self._listdir_cache
406
407
408
class TarFileStream(object):
409
    '''
410
    Tarfile which compresses while reading for streaming.
411
412
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
413
    compression.
414
    '''
415
    event_class = threading.Event
416
    thread_class = threading.Thread
417
    tarfile_class = tarfile.open
418
419
    def __init__(self, path, buffsize=10240):
420
        self.path = path
421
        self.name = os.path.basename(path) + ".tgz"
422
423
        self._finished = 0
424
        self._want = 0
425
        self._data = bytes()
426
        self._add = self.event_class()
427
        self._result = self.event_class()
428
        self._tarfile = self.tarfile_class(  # stream write
429
            fileobj=self,
430
            mode="w|gz",
431
            bufsize=buffsize
432
            )
433
        self._th = self.thread_class(target=self.fill)
434
        self._th.start()
435
436
    def fill(self):
437
        self._tarfile.add(self.path, "")
438
        self._tarfile.close()  # force stream flush
439
        self._finished += 1
440
        if not self._result.is_set():
441
            self._result.set()
442
443
    def write(self, data):
444
        self._add.wait()
445
        self._data += data
446
        if len(self._data) > self._want:
447
            self._add.clear()
448
            self._result.set()
449
        return len(data)
450
451
    def read(self, want=0):
452
        if self._finished:
453
            if self._finished == 1:
454
                self._finished += 1
455
                return ""
456
            return EOFError("EOF reached")
457
458
        # Thread communication
459
        self._want = want
460
        self._add.set()
461
        self._result.wait()
462
        self._result.clear()
463
464
        if want:
465
            data = self._data[:want]
466
            self._data = self._data[want:]
467
        else:
468
            data = self._data
469
            self._data = bytes()
470
        return data
471
472
    def __iter__(self):
473
        data = self.read()
474
        while data:
475
            yield data
476
            data = self.read()
477
478
479
class OutsideDirectoryBase(Exception):
480
    pass
481
482
483
class OutsideRemovableBase(Exception):
484
    pass
485
486
487
def fmt_size(size, binary=True):
488
    '''
489
    Get size and unit.
490
491
    :param size: size in bytes
492
    :param binary: whether use binary or standard units, defaults to True
493
    :return: size and unit
494
    :rtype: tuple of int and unit as str
495
    '''
496
    if binary:
497
        fmt_sizes = binary_units
498
        fmt_divider = 1024.
499
    else:
500
        fmt_sizes = standard_units
501
        fmt_divider = 1000.
502
    for fmt in fmt_sizes[:-1]:
503
        if size < 1000:
504
            return (size, fmt)
505
        size /= fmt_divider
506
    return size, fmt_sizes[-1]
507
508
509
def relativize_path(path, base, os_sep=os.sep):
510
    '''
511
    Make absolute path relative to an absolute base.
512
513
    :param path: absolute path
514
    :param base: absolute base path
515
    :param os_sep: path component separator, defaults to current OS separator
516
    :return: relative path
517
    :rtype: str or unicode
518
    :raises OutsideDirectoryBase: if path is not below base
519
    '''
520
    if not check_under_base(path, base, os_sep):
521
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
522
    prefix_len = len(base)
523
    if not base.endswith(os_sep):
524
        prefix_len += len(os_sep)
525
    return path[prefix_len:]
526
527
528
def abspath_to_urlpath(path, base, os_sep=os.sep):
529
    '''
530
    Make filesystem absolute path uri relative using given absolute base path.
531
532
    :param path: absolute path
533
    :param base: absolute base path
534
    :param os_sep: path component separator, defaults to current OS separator
535
    :return: relative uri
536
    :rtype: str or unicode
537
    :raises OutsideDirectoryBase: if resulting path is not below base
538
    '''
539
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
540
541
542
def urlpath_to_abspath(path, base, os_sep=os.sep):
543
    '''
544
    Make uri relative path fs absolute using a given absolute base path.
545
546
    :param path: relative path
547
    :param base: absolute base path
548
    :param os_sep: path component separator, defaults to current OS separator
549
    :return: absolute path
550
    :rtype: str or unicode
551
    :raises OutsideDirectoryBase: if resulting path is not below base
552
    '''
553
    prefix = base if base.endswith(os_sep) else base + os_sep
554
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
555
    if base == realpath or realpath.startswith(prefix):
556
        return realpath
557
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
558
559
560
def generic_filename(path):
561
    '''
562
    Extract filename of given path os-indepently, taking care of known path
563
    separators.
564
565
    :param path: path
566
    :return: filename
567
    :rtype: str or unicode (depending on given path)
568
    '''
569
570
    for sep in common_path_separators:
571
        if sep in path:
572
            _, path = path.rsplit(sep, 1)
573
    return path
574
575
576
def clean_restricted_chars(path, restricted_chars=restricted_chars):
577
    '''
578
    Get path without restricted characters.
579
580
    :param path: path
581
    :return: path without restricted characters
582
    :rtype: str or unicode (depending on given path)
583
    '''
584
    for character in restricted_chars:
585
        path = path.replace(character, '_')
586
    return path
587
588
589
def check_forbidden_filename(filename,
590
                             destiny_os=os.name,
591
                             restricted_names=restricted_names):
592
    '''
593
    Get if given filename is forbidden for current OS or filesystem.
594
595
    :param filename:
596
    :param destiny_os: destination operative system
597
    :param fs_encoding: destination filesystem filename encoding
598
    :return: wether is forbidden on given OS (or filesystem) or not
599
    :rtype: bool
600
    '''
601
    if destiny_os == 'nt':
602
        fpc = filename.split('.', 1)[0].upper()
603
        if fpc in nt_device_names:
604
            return True
605
606
    return filename in restricted_names
607
608
609
def check_under_base(path, base, os_sep=os.sep):
610
    '''
611
    Check if given absolute path is under given base.
612
613
    :param path: absolute path
614
    :param base: absolute base path
615
    :return: wether file is under given base or not
616
    :rtype: bool
617
    '''
618
    prefix = base if base.endswith(os_sep) else base + os_sep
619
    return path == base or path.startswith(prefix)
620
621
622
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
623
    '''
624
    Get rid of parent path components and special filenames.
625
626
    If path is invalid or protected, return empty string.
627
628
    :param path: unsafe path
629
    :type: str
630
    :param destiny_os: destination operative system
631
    :type destiny_os: str
632
    :return: filename or empty string
633
    :rtype: str
634
    '''
635
    path = generic_filename(path)
636
    path = clean_restricted_chars(path)
637
638
    if check_forbidden_filename(path, destiny_os=destiny_os):
639
        return ''
640
641
    if isinstance(path, bytes):
642
        path = path.decode('latin-1', errors=underscore_replace)
643
644
    # Decode and recover from filesystem encoding in order to strip unwanted
645
    # characters out
646
    kwargs = dict(
647
        os_name=destiny_os,
648
        fs_encoding=fs_encoding,
649
        errors=underscore_replace
650
        )
651
    fs_encoded_path = compat.fsencode(path, **kwargs)
652
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
653
    return fs_decoded_path
654
655
656
def alternative_filename(filename, attempt=None):
657
    '''
658
    Generates an alternative version of given filename.
659
660
    If an number attempt parameter is given, will be used on the alternative
661
    name, a random value will be used otherwise.
662
663
    :param filename: original filename
664
    :param attempt: optional attempt number, defaults to null
665
    :return: new filename
666
    :rtype: str or unicode
667
    '''
668
    filename_parts = filename.rsplit(u'.', 2)
669
    name = filename_parts[0]
670
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
671
    if attempt is None:
672
        choose = random.choice
673
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
674
    else:
675
        extra = u' (%d)' % attempt
676
    return u'%s%s%s' % (name, extra, ext)
677