Completed
Push — dev-4.1-unstable ( 3dbdf3...065df3 )
by Felipe A.
01:02
created

Node.widgets()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 14
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @cached_property
52
    def widgets(self):
53
        widgets = []
54
        if self.can_remove:
55
            widgets.append(
56
                self.plugin_manager.create_widget(
57
                    'entry-actions',
58
                    'button',
59
                    file=self,
60
                    css='remove',
61
                    endpoint='remove'
62
                    )
63
                )
64
        return widgets + self.plugin_manager.get_widgets(file=self)
65
66
    @cached_property
67
    def link(self):
68
        for widget in self.widgets:
69
            if widget.place == 'entry-link':
70
                return widget
71
72
    @cached_property
73
    def can_remove(self):
74
        dirbase = self.app.config["directory_remove"]
75
        return dirbase and self.path.startswith(dirbase + os.sep)
76
77
    @cached_property
78
    def stats(self):
79
        return os.stat(self.path)
80
81
    @cached_property
82
    def parent(self):
83
        if self.path == self.app.config['directory_base']:
84
            return None
85
        parent = os.path.dirname(self.path) if self.path else None
86
        return self.directory_class(parent, self.app) if parent else None
87
88
    @cached_property
89
    def ancestors(self):
90
        ancestors = []
91
        parent = self.parent
92
        while parent:
93
            ancestors.append(parent)
94
            parent = parent.parent
95
        return ancestors
96
97
    @property
98
    def modified(self):
99
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
100
        return dt.strftime('%Y.%m.%d %H:%M:%S')
101
102
    @property
103
    def urlpath(self):
104
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
105
106
    @property
107
    def name(self):
108
        return os.path.basename(self.path)
109
110
    @property
111
    def type(self):
112
        return self.mimetype.split(";", 1)[0]
113
114
    @property
115
    def category(self):
116
        return self.type.split('/', 1)[0]
117
118
    def __init__(self, path=None, app=None, **defaults):
119
        self.path = compat.fsdecode(path) if path else None
120
        self.app = current_app if app is None else app
121
        self.__dict__.update(defaults)
122
123
    def remove(self):
124
        if not self.can_remove:
125
            raise OutsideRemovableBase("File outside removable base")
126
127
    @classmethod
128
    def from_urlpath(cls, path, app=None):
129
        '''
130
        Alternative constructor which accepts a path as taken from URL and uses
131
        the given app or the current app config to get the real path.
132
133
        If class has attribute `generic` set to True, `directory_class` or
134
        `file_class` will be used as type.
135
136
        :param path: relative path as from URL
137
        :param app: optional, flask application
138
        :return: file object pointing to path
139
        :rtype: File
140
        '''
141
        app = app or current_app
142
        base = app.config['directory_base']
143
        path = urlpath_to_abspath(path, base)
144
        if not cls.generic:
145
            kls = cls
146
        elif os.path.isdir(path):
147
            kls = cls.directory_class
148
        else:
149
            kls = cls.file_class
150
        return kls(path=path, app=app)
151
152
    @classmethod
153
    def register_file_class(cls, kls):
154
        cls.file_class = kls
155
        return kls
156
157
    @classmethod
158
    def register_directory_class(cls, kls):
159
        cls.directory_class = kls
160
        return kls
161
162
163
@Node.register_file_class
164
class File(Node):
165
    can_download = True
166
    can_upload = False
167
    is_directory = False
168
    generic = False
169
170
    @cached_property
171
    def widgets(self):
172
        widgets = []
173
        if self.can_download:
174
            widgets.append(
175
                self.plugin_manager.create_widget(
176
                    'entry-actions',
177
                    'button',
178
                    file=self,
179
                    css='download',
180
                    endpoint='download_file'
181
                    )
182
                )
183
        return widgets + super(File, self).widgets
184
185
    @property
186
    def link(self):
187
        widget = super(File, self).link
188
        if widget is None:
189
            return self.plugin_manager.create_widget(
190
                'entry-link',
191
                'link',
192
                file=self,
193
                endpoint='open'
194
                )
195
        return widget
196
197
    @cached_property
198
    def mimetype(self):
199
        return self.plugin_manager.get_mimetype(self.path)
200
201
    @cached_property
202
    def is_file(self):
203
        return os.path.isfile(self.path)
204
205
    @property
206
    def size(self):
207
        size, unit = fmt_size(
208
            self.stats.st_size,
209
            self.app.config["use_binary_multiples"]
210
            )
211
        if unit == binary_units[0]:
212
            return "%d %s" % (size, unit)
213
        return "%.2f %s" % (size, unit)
214
215
    @property
216
    def encoding(self):
217
        if ";" in self.mimetype:
218
            match = self.re_charset.search(self.mimetype)
219
            gdict = match.groupdict() if match else {}
220
            return gdict.get("charset") or "default"
221
        return "default"
222
223
    def remove(self):
224
        '''
225
        Remove file.
226
        :raises OutsideRemovableBase: when not under removable base directory
227
        '''
228
        super(File, self).remove()
229
        os.unlink(self.path)
230
231
    def download(self):
232
        '''
233
        Get a Flask's send_file Response object pointing to this file.
234
235
        :returns: Response object as returned by flask's send_file
236
        :rtype: flask.Response
237
        '''
238
        directory, name = os.path.split(self.path)
239
        return send_from_directory(directory, name, as_attachment=True)
240
241
242
@Node.register_directory_class
243
class Directory(Node):
244
    _listdir_cache = None
245
    mimetype = 'inode/directory'
246
    is_file = False
247
    size = 0
248
    encoding = 'default'
249
    generic = False
250
251
    @cached_property
252
    def widgets(self):
253
        widgets = []
254
        if self.can_upload:
255
            widgets.extend((
256
                self.plugin_manager.create_widget(
257
                    'head',
258
                    'script',
259
                    file=self,
260
                    endpoint='static',
261
                    filename='browse.directory.head.js'
262
                ),
263
                self.plugin_manager.create_widget(
264
                    'scripts',
265
                    'script',
266
                    file=self,
267
                    endpoint='static',
268
                    filename='browse.directory.body.js'
269
                ),
270
                self.plugin_manager.create_widget(
271
                    'header',
272
                    'upload',
273
                    file=self,
274
                    text='Upload',
275
                    endpoint='upload'
276
                    )
277
                ))
278
        if self.can_download:
279
            widgets.append(
280
                self.plugin_manager.create_widget(
281
                    'entry-actions',
282
                    'button',
283
                    file=self,
284
                    css='download',
285
                    endpoint='download_directory'
286
                    )
287
                )
288
        return widgets + super(Directory, self).widgets
289
290
    @property
291
    def link(self):
292
        widget = super(Directory, self).link
293
        if widget is None:
294
            return self.plugin_manager.create_widget(
295
                'entry-link',
296
                'link',
297
                file=self,
298
                endpoint='browse'
299
                )
300
        return widget
301
302
    @cached_property
303
    def is_directory(self):
304
        return os.path.isdir(self.path)
305
306
    @cached_property
307
    def can_download(self):
308
        return self.app.config['directory_downloadable']
309
310
    @cached_property
311
    def can_upload(self):
312
        dirbase = self.app.config["directory_upload"]
313
        return dirbase and (
314
            dirbase == self.path or
315
            self.path.startswith(dirbase + os.sep)
316
            )
317
318
    @cached_property
319
    def is_empty(self):
320
        if self._listdir_cache is not None:
321
            return bool(self._listdir_cache)
322
        for entry in self._listdir():
323
            return False
324
        return True
325
326
    def remove(self):
327
        '''
328
        Remove directory tree.
329
330
        :raises OutsideRemovableBase: when not under removable base directory
331
        '''
332
        super(Directory, self).remove()
333
        shutil.rmtree(self.path)
334
335
    def download(self):
336
        '''
337
        Get a Flask Response object streaming a tarball of this directory.
338
339
        :returns: Response object
340
        :rtype: flask.Response
341
        '''
342
        return self.app.response_class(
343
            TarFileStream(
344
                self.path,
345
                self.app.config["directory_tar_buffsize"]
346
                ),
347
            mimetype="application/octet-stream"
348
            )
349
350
    def contains(self, filename):
351
        '''
352
        Check if directory contains an entry with given filename.
353
354
        :param filename: filename will be check
355
        :type filename: str
356
        :returns: True if exists, False otherwise.
357
        :rtype: bool
358
        '''
359
        return os.path.exists(os.path.join(self.path, filename))
360
361
    def choose_filename(self, filename, attempts=999):
362
        '''
363
        Get a new filename which does not colide with any entry on directory,
364
        based on given filename.
365
366
        :param filename: base filename
367
        :type filename: str
368
        :param attempts: number of attempts, defaults to 999
369
        :type attempts: int
370
        :returns: filename
371
        :rtype: str
372
        '''
373
        new_filename = filename
374
        for attempt in range(2, attempts + 1):
375
            if not self.contains(new_filename):
376
                return new_filename
377
            new_filename = alternative_filename(filename, attempt)
378
        while self.contains(new_filename):
379
            new_filename = alternative_filename(filename)
380
        return new_filename
381
382
    def _listdir(self):
383
        '''
384
        Iter unsorted entries on this directory.
385
386
        :yields: Directory or File instance for each entry in directory
387
        :ytype: Node
388
        '''
389
        precomputed_stats = os.name == 'nt'
390
        for entry in compat.scandir(self.path):
391
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
392
            if precomputed_stats and not entry.is_symlink():
393
                kwargs['stats'] = entry.stats()
394
            if entry.is_dir(follow_symlinks=True):
395
                yield self.directory_class(**kwargs)
396
                continue
397
            yield self.file_class(**kwargs)
398
399
    def listdir(self, sortkey=None, reverse=False):
400
        '''
401
        Get sorted list (by `is_directory` and `name` properties) of File
402
        objects.
403
404
        :return: sorted list of File instances
405
        :rtype: list of File
406
        '''
407
        if self._listdir_cache is None:
408
            if sortkey:
409
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
410
            elif reverse:
411
                data = list(reversed(self._listdir()))
412
            else:
413
                data = list(self._listdir())
414
            self._listdir_cache = data
415
        return self._listdir_cache
416
417
418
class TarFileStream(object):
419
    '''
420
    Tarfile which compresses while reading for streaming.
421
422
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
423
    compression.
424
    '''
425
    event_class = threading.Event
426
    thread_class = threading.Thread
427
    tarfile_class = tarfile.open
428
429
    def __init__(self, path, buffsize=10240):
430
        self.path = path
431
        self.name = os.path.basename(path) + ".tgz"
432
433
        self._finished = 0
434
        self._want = 0
435
        self._data = bytes()
436
        self._add = self.event_class()
437
        self._result = self.event_class()
438
        self._tarfile = self.tarfile_class(  # stream write
439
            fileobj=self,
440
            mode="w|gz",
441
            bufsize=buffsize
442
            )
443
        self._th = self.thread_class(target=self.fill)
444
        self._th.start()
445
446
    def fill(self):
447
        self._tarfile.add(self.path, "")
448
        self._tarfile.close()  # force stream flush
449
        self._finished += 1
450
        if not self._result.is_set():
451
            self._result.set()
452
453
    def write(self, data):
454
        self._add.wait()
455
        self._data += data
456
        if len(self._data) > self._want:
457
            self._add.clear()
458
            self._result.set()
459
        return len(data)
460
461
    def read(self, want=0):
462
        if self._finished:
463
            if self._finished == 1:
464
                self._finished += 1
465
                return ""
466
            return EOFError("EOF reached")
467
468
        # Thread communication
469
        self._want = want
470
        self._add.set()
471
        self._result.wait()
472
        self._result.clear()
473
474
        if want:
475
            data = self._data[:want]
476
            self._data = self._data[want:]
477
        else:
478
            data = self._data
479
            self._data = bytes()
480
        return data
481
482
    def __iter__(self):
483
        data = self.read()
484
        while data:
485
            yield data
486
            data = self.read()
487
488
489
class OutsideDirectoryBase(Exception):
490
    pass
491
492
493
class OutsideRemovableBase(Exception):
494
    pass
495
496
497
def fmt_size(size, binary=True):
498
    '''
499
    Get size and unit.
500
501
    :param size: size in bytes
502
    :param binary: whether use binary or standard units, defaults to True
503
    :return: size and unit
504
    :rtype: tuple of int and unit as str
505
    '''
506
    if binary:
507
        fmt_sizes = binary_units
508
        fmt_divider = 1024.
509
    else:
510
        fmt_sizes = standard_units
511
        fmt_divider = 1000.
512
    for fmt in fmt_sizes[:-1]:
513
        if size < 1000:
514
            return (size, fmt)
515
        size /= fmt_divider
516
    return size, fmt_sizes[-1]
517
518
519
def relativize_path(path, base, os_sep=os.sep):
520
    '''
521
    Make absolute path relative to an absolute base.
522
523
    :param path: absolute path
524
    :param base: absolute base path
525
    :param os_sep: path component separator, defaults to current OS separator
526
    :return: relative path
527
    :rtype: str or unicode
528
    :raises OutsideDirectoryBase: if path is not below base
529
    '''
530
    if not check_under_base(path, base, os_sep):
531
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
532
    prefix_len = len(base)
533
    if not base.endswith(os_sep):
534
        prefix_len += len(os_sep)
535
    return path[prefix_len:]
536
537
538
def abspath_to_urlpath(path, base, os_sep=os.sep):
539
    '''
540
    Make filesystem absolute path uri relative using given absolute base path.
541
542
    :param path: absolute path
543
    :param base: absolute base path
544
    :param os_sep: path component separator, defaults to current OS separator
545
    :return: relative uri
546
    :rtype: str or unicode
547
    :raises OutsideDirectoryBase: if resulting path is not below base
548
    '''
549
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
550
551
552
def urlpath_to_abspath(path, base, os_sep=os.sep):
553
    '''
554
    Make uri relative path fs absolute using a given absolute base path.
555
556
    :param path: relative path
557
    :param base: absolute base path
558
    :param os_sep: path component separator, defaults to current OS separator
559
    :return: absolute path
560
    :rtype: str or unicode
561
    :raises OutsideDirectoryBase: if resulting path is not below base
562
    '''
563
    prefix = base if base.endswith(os_sep) else base + os_sep
564
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
565
    if base == realpath or realpath.startswith(prefix):
566
        return realpath
567
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
568
569
570
def generic_filename(path):
571
    '''
572
    Extract filename of given path os-indepently, taking care of known path
573
    separators.
574
575
    :param path: path
576
    :return: filename
577
    :rtype: str or unicode (depending on given path)
578
    '''
579
580
    for sep in common_path_separators:
581
        if sep in path:
582
            _, path = path.rsplit(sep, 1)
583
    return path
584
585
586
def clean_restricted_chars(path, restricted_chars=restricted_chars):
587
    '''
588
    Get path without restricted characters.
589
590
    :param path: path
591
    :return: path without restricted characters
592
    :rtype: str or unicode (depending on given path)
593
    '''
594
    for character in restricted_chars:
595
        path = path.replace(character, '_')
596
    return path
597
598
599
def check_forbidden_filename(filename,
600
                             destiny_os=os.name,
601
                             restricted_names=restricted_names):
602
    '''
603
    Get if given filename is forbidden for current OS or filesystem.
604
605
    :param filename:
606
    :param destiny_os: destination operative system
607
    :param fs_encoding: destination filesystem filename encoding
608
    :return: wether is forbidden on given OS (or filesystem) or not
609
    :rtype: bool
610
    '''
611
    if destiny_os == 'nt':
612
        fpc = filename.split('.', 1)[0].upper()
613
        if fpc in nt_device_names:
614
            return True
615
616
    return filename in restricted_names
617
618
619
def check_under_base(path, base, os_sep=os.sep):
620
    '''
621
    Check if given absolute path is under given base.
622
623
    :param path: absolute path
624
    :param base: absolute base path
625
    :return: wether file is under given base or not
626
    :rtype: bool
627
    '''
628
    prefix = base if base.endswith(os_sep) else base + os_sep
629
    return path == base or path.startswith(prefix)
630
631
632
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
633
    '''
634
    Get rid of parent path components and special filenames.
635
636
    If path is invalid or protected, return empty string.
637
638
    :param path: unsafe path
639
    :type: str
640
    :param destiny_os: destination operative system
641
    :type destiny_os: str
642
    :return: filename or empty string
643
    :rtype: str
644
    '''
645
    path = generic_filename(path)
646
    path = clean_restricted_chars(path)
647
648
    if check_forbidden_filename(path, destiny_os=destiny_os):
649
        return ''
650
651
    if isinstance(path, bytes):
652
        path = path.decode('latin-1', errors=underscore_replace)
653
654
    # Decode and recover from filesystem encoding in order to strip unwanted
655
    # characters out
656
    kwargs = dict(
657
        os_name=destiny_os,
658
        fs_encoding=fs_encoding,
659
        errors=underscore_replace
660
        )
661
    fs_encoded_path = compat.fsencode(path, **kwargs)
662
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
663
    return fs_decoded_path
664
665
666
def alternative_filename(filename, attempt=None):
667
    '''
668
    Generates an alternative version of given filename.
669
670
    If an number attempt parameter is given, will be used on the alternative
671
    name, a random value will be used otherwise.
672
673
    :param filename: original filename
674
    :param attempt: optional attempt number, defaults to null
675
    :return: new filename
676
    :rtype: str or unicode
677
    '''
678
    filename_parts = filename.rsplit(u'.', 2)
679
    name = filename_parts[0]
680
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
681
    if attempt is None:
682
        choose = random.choice
683
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
684
    else:
685
        extra = u' (%d)' % attempt
686
    return u'%s%s%s' % (name, extra, ext)
687