Completed
Push — dev-4.1-unstable ( fbeeec...3dbdf3 )
by Felipe A.
01:04
created

Directory.widgets()   B

Complexity

Conditions 3

Size

Total Lines 37

Duplication

Lines 37
Ratio 100 %

Importance

Changes 0
Metric Value
cc 3
dl 37
loc 37
rs 8.8571
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51 View Code Duplication
    @cached_property
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
52
    def widgets(self):
53
        widgets = self.plugin_manager.get_widgets(file=self)
54
        widgets.extend((
55
            self.plugin_manager.create_widget(
56
                'head',
57
                'script',
58
                file=self,
59
                endpoint='static',
60
                filename='browse.body.js'
61
                ),
62
            self.plugin_manager.create_widget(
63
                'scripts',
64
                'script',
65
                file=self,
66
                endpoint='static',
67
                filename='browse.body.js'
68
                )
69
        ))
70
        if self.can_remove:
71
            action = self.plugin_manager.create_widget(
72
                'entry-actions',
73
                'button',
74
                file=self,
75
                css='remove',
76
                endpoint='remove'
77
                )
78
            widgets.append(action)
79
        return widgets
80
81
    @cached_property
82
    def link(self):
83
        for widget in self.widgets:
84
            if widget.place == 'entry-link':
85
                return widget
86
87
    @cached_property
88
    def can_remove(self):
89
        dirbase = self.app.config["directory_remove"]
90
        return dirbase and self.path.startswith(dirbase + os.sep)
91
92
    @cached_property
93
    def stats(self):
94
        return os.stat(self.path)
95
96
    @cached_property
97
    def parent(self):
98
        if self.path == self.app.config['directory_base']:
99
            return None
100
        parent = os.path.dirname(self.path) if self.path else None
101
        return self.directory_class(parent, self.app) if parent else None
102
103
    @cached_property
104
    def ancestors(self):
105
        ancestors = []
106
        parent = self.parent
107
        while parent:
108
            ancestors.append(parent)
109
            parent = parent.parent
110
        return ancestors
111
112
    @property
113
    def modified(self):
114
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
115
        return dt.strftime('%Y.%m.%d %H:%M:%S')
116
117
    @property
118
    def urlpath(self):
119
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
120
121
    @property
122
    def name(self):
123
        return os.path.basename(self.path)
124
125
    @property
126
    def type(self):
127
        return self.mimetype.split(";", 1)[0]
128
129
    def __init__(self, path=None, app=None, **defaults):
130
        self.path = compat.fsdecode(path) if path else None
131
        self.app = current_app if app is None else app
132
        self.__dict__.update(defaults)
133
134
    def remove(self):
135
        if not self.can_remove:
136
            raise OutsideRemovableBase("File outside removable base")
137
138
    @classmethod
139
    def from_urlpath(cls, path, app=None):
140
        '''
141
        Alternative constructor which accepts a path as taken from URL and uses
142
        the given app or the current app config to get the real path.
143
144
        If class has attribute `generic` set to True, `directory_class` or
145
        `file_class` will be used as type.
146
147
        :param path: relative path as from URL
148
        :param app: optional, flask application
149
        :return: file object pointing to path
150
        :rtype: File
151
        '''
152
        app = app or current_app
153
        base = app.config['directory_base']
154
        path = urlpath_to_abspath(path, base)
155
        if not cls.generic:
156
            kls = cls
157
        elif os.path.isdir(path):
158
            kls = cls.directory_class
159
        else:
160
            kls = cls.file_class
161
        return kls(path=path, app=app)
162
163
    @classmethod
164
    def register_file_class(cls, kls):
165
        cls.file_class = kls
166
        return kls
167
168
    @classmethod
169
    def register_directory_class(cls, kls):
170
        cls.directory_class = kls
171
        return kls
172
173
174
@Node.register_file_class
175
class File(Node):
176
    can_download = True
177
    can_upload = False
178
    is_directory = False
179
    generic = False
180
181
    @cached_property
182
    def widgets(self):
183
        widgets = super(File, self).widgets
184
        if self.can_download:
185
            action = self.plugin_manager.create_widget(
186
                'entry-actions',
187
                'button',
188
                file=self,
189
                css='download',
190
                endpoint='download_file'
191
                )
192
            widgets.append(action)
193
        return widgets
194
195
    @property
196
    def link(self):
197
        widget = super(File, self).link
198
        if widget is None:
199
            return self.plugin_manager.create_widget(
200
                'entry-link',
201
                'link',
202
                file=self,
203
                endpoint='open'
204
                )
205
        return widget
206
207
    @cached_property
208
    def mimetype(self):
209
        return self.plugin_manager.get_mimetype(self.path)
210
211
    @cached_property
212
    def is_file(self):
213
        return os.path.isfile(self.path)
214
215
    @property
216
    def size(self):
217
        size, unit = fmt_size(
218
            self.stats.st_size,
219
            self.app.config["use_binary_multiples"]
220
            )
221
        if unit == binary_units[0]:
222
            return "%d %s" % (size, unit)
223
        return "%.2f %s" % (size, unit)
224
225
    @property
226
    def encoding(self):
227
        if ";" in self.mimetype:
228
            match = self.re_charset.search(self.mimetype)
229
            gdict = match.groupdict() if match else {}
230
            return gdict.get("charset") or "default"
231
        return "default"
232
233
    def remove(self):
234
        '''
235
        Remove file.
236
        :raises OutsideRemovableBase: when not under removable base directory
237
        '''
238
        super(File, self).remove()
239
        os.unlink(self.path)
240
241
    def download(self):
242
        '''
243
        Get a Flask's send_file Response object pointing to this file.
244
245
        :returns: Response object as returned by flask's send_file
246
        :rtype: flask.Response
247
        '''
248
        directory, name = os.path.split(self.path)
249
        return send_from_directory(directory, name, as_attachment=True)
250
251
252
@Node.register_directory_class
253
class Directory(Node):
254
    _listdir_cache = None
255
    mimetype = 'inode/directory'
256
    is_file = False
257
    size = 0
258
    encoding = 'default'
259
    generic = False
260
261 View Code Duplication
    @cached_property
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
262
    def widgets(self):
263
        widgets = super(Directory, self).widgets
264
        if self.can_upload:
265
            widgets.extend((
266
                self.plugin_manager.create_widget(
267
                    'head',
268
                    'script',
269
                    file=self,
270
                    endpoint='static',
271
                    filename='browse.directory.head.js'
272
                ),
273
                self.plugin_manager.create_widget(
274
                    'scripts',
275
                    'script',
276
                    file=self,
277
                    endpoint='static',
278
                    filename='browse.directory.body.js'
279
                ),
280
                self.plugin_manager.create_widget(
281
                    'header',
282
                    'upload',
283
                    file=self,
284
                    text='Upload',
285
                    endpoint='upload'
286
                    )
287
                ))
288
        if self.can_download:
289
            action = self.plugin_manager.create_widget(
290
                'entry-actions',
291
                'button',
292
                file=self,
293
                css='download',
294
                endpoint='download_directory'
295
                )
296
            widgets.append(action)
297
        return widgets
298
299
    @property
300
    def link(self):
301
        widget = super(Directory, self).link
302
        if widget is None:
303
            return self.plugin_manager.create_widget(
304
                'entry-link',
305
                'link',
306
                file=self,
307
                endpoint='browse'
308
                )
309
        return widget
310
311
    @cached_property
312
    def is_directory(self):
313
        return os.path.isdir(self.path)
314
315
    @cached_property
316
    def can_download(self):
317
        return self.app.config['directory_downloadable']
318
319
    @cached_property
320
    def can_upload(self):
321
        dirbase = self.app.config["directory_upload"]
322
        return dirbase and (
323
            dirbase == self.path or
324
            self.path.startswith(dirbase + os.sep)
325
            )
326
327
    @cached_property
328
    def is_empty(self):
329
        if self._listdir_cache is not None:
330
            return bool(self._listdir_cache)
331
        for entry in self._listdir():
332
            return False
333
        return True
334
335
    def remove(self):
336
        '''
337
        Remove directory tree.
338
339
        :raises OutsideRemovableBase: when not under removable base directory
340
        '''
341
        super(Directory, self).remove()
342
        shutil.rmtree(self.path)
343
344
    def download(self):
345
        '''
346
        Get a Flask Response object streaming a tarball of this directory.
347
348
        :returns: Response object
349
        :rtype: flask.Response
350
        '''
351
        return self.app.response_class(
352
            TarFileStream(
353
                self.path,
354
                self.app.config["directory_tar_buffsize"]
355
                ),
356
            mimetype="application/octet-stream"
357
            )
358
359
    def contains(self, filename):
360
        '''
361
        Check if directory contains an entry with given filename.
362
363
        :param filename: filename will be check
364
        :type filename: str
365
        :returns: True if exists, False otherwise.
366
        :rtype: bool
367
        '''
368
        return os.path.exists(os.path.join(self.path, filename))
369
370
    def choose_filename(self, filename, attempts=999):
371
        '''
372
        Get a new filename which does not colide with any entry on directory,
373
        based on given filename.
374
375
        :param filename: base filename
376
        :type filename: str
377
        :param attempts: number of attempts, defaults to 999
378
        :type attempts: int
379
        :returns: filename
380
        :rtype: str
381
        '''
382
        new_filename = filename
383
        for attempt in range(2, attempts + 1):
384
            if not self.contains(new_filename):
385
                return new_filename
386
            new_filename = alternative_filename(filename, attempt)
387
        while self.contains(new_filename):
388
            new_filename = alternative_filename(filename)
389
        return new_filename
390
391
    def _listdir(self):
392
        '''
393
        Iter unsorted entries on this directory.
394
395
        :yields: Directory or File instance for each entry in directory
396
        :ytype: Node
397
        '''
398
        precomputed_stats = os.name == 'nt'
399
        for entry in compat.scandir(self.path):
400
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
401
            if precomputed_stats and not entry.is_symlink():
402
                kwargs['stats'] = entry.stats()
403
            if entry.is_dir(follow_symlinks=True):
404
                yield self.directory_class(**kwargs)
405
                continue
406
            yield self.file_class(**kwargs)
407
408
    def listdir(self, sortkey=None, reverse=False):
409
        '''
410
        Get sorted list (by `is_directory` and `name` properties) of File
411
        objects.
412
413
        :return: sorted list of File instances
414
        :rtype: list of File
415
        '''
416
        if self._listdir_cache is None:
417
            if sortkey:
418
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
419
            elif reverse:
420
                data = list(reversed(self._listdir()))
421
            else:
422
                data = list(self._listdir())
423
            self._listdir_cache = data
424
        return self._listdir_cache
425
426
427
class TarFileStream(object):
428
    '''
429
    Tarfile which compresses while reading for streaming.
430
431
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
432
    compression.
433
    '''
434
    event_class = threading.Event
435
    thread_class = threading.Thread
436
    tarfile_class = tarfile.open
437
438
    def __init__(self, path, buffsize=10240):
439
        self.path = path
440
        self.name = os.path.basename(path) + ".tgz"
441
442
        self._finished = 0
443
        self._want = 0
444
        self._data = bytes()
445
        self._add = self.event_class()
446
        self._result = self.event_class()
447
        self._tarfile = self.tarfile_class(  # stream write
448
            fileobj=self,
449
            mode="w|gz",
450
            bufsize=buffsize
451
            )
452
        self._th = self.thread_class(target=self.fill)
453
        self._th.start()
454
455
    def fill(self):
456
        self._tarfile.add(self.path, "")
457
        self._tarfile.close()  # force stream flush
458
        self._finished += 1
459
        if not self._result.is_set():
460
            self._result.set()
461
462
    def write(self, data):
463
        self._add.wait()
464
        self._data += data
465
        if len(self._data) > self._want:
466
            self._add.clear()
467
            self._result.set()
468
        return len(data)
469
470
    def read(self, want=0):
471
        if self._finished:
472
            if self._finished == 1:
473
                self._finished += 1
474
                return ""
475
            return EOFError("EOF reached")
476
477
        # Thread communication
478
        self._want = want
479
        self._add.set()
480
        self._result.wait()
481
        self._result.clear()
482
483
        if want:
484
            data = self._data[:want]
485
            self._data = self._data[want:]
486
        else:
487
            data = self._data
488
            self._data = bytes()
489
        return data
490
491
    def __iter__(self):
492
        data = self.read()
493
        while data:
494
            yield data
495
            data = self.read()
496
497
498
class OutsideDirectoryBase(Exception):
499
    pass
500
501
502
class OutsideRemovableBase(Exception):
503
    pass
504
505
506
def fmt_size(size, binary=True):
507
    '''
508
    Get size and unit.
509
510
    :param size: size in bytes
511
    :param binary: whether use binary or standard units, defaults to True
512
    :return: size and unit
513
    :rtype: tuple of int and unit as str
514
    '''
515
    if binary:
516
        fmt_sizes = binary_units
517
        fmt_divider = 1024.
518
    else:
519
        fmt_sizes = standard_units
520
        fmt_divider = 1000.
521
    for fmt in fmt_sizes[:-1]:
522
        if size < 1000:
523
            return (size, fmt)
524
        size /= fmt_divider
525
    return size, fmt_sizes[-1]
526
527
528
def relativize_path(path, base, os_sep=os.sep):
529
    '''
530
    Make absolute path relative to an absolute base.
531
532
    :param path: absolute path
533
    :param base: absolute base path
534
    :param os_sep: path component separator, defaults to current OS separator
535
    :return: relative path
536
    :rtype: str or unicode
537
    :raises OutsideDirectoryBase: if path is not below base
538
    '''
539
    if not check_under_base(path, base, os_sep):
540
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
541
    prefix_len = len(base)
542
    if not base.endswith(os_sep):
543
        prefix_len += len(os_sep)
544
    return path[prefix_len:]
545
546
547
def abspath_to_urlpath(path, base, os_sep=os.sep):
548
    '''
549
    Make filesystem absolute path uri relative using given absolute base path.
550
551
    :param path: absolute path
552
    :param base: absolute base path
553
    :param os_sep: path component separator, defaults to current OS separator
554
    :return: relative uri
555
    :rtype: str or unicode
556
    :raises OutsideDirectoryBase: if resulting path is not below base
557
    '''
558
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
559
560
561
def urlpath_to_abspath(path, base, os_sep=os.sep):
562
    '''
563
    Make uri relative path fs absolute using a given absolute base path.
564
565
    :param path: relative path
566
    :param base: absolute base path
567
    :param os_sep: path component separator, defaults to current OS separator
568
    :return: absolute path
569
    :rtype: str or unicode
570
    :raises OutsideDirectoryBase: if resulting path is not below base
571
    '''
572
    prefix = base if base.endswith(os_sep) else base + os_sep
573
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
574
    if base == realpath or realpath.startswith(prefix):
575
        return realpath
576
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
577
578
579
def generic_filename(path):
580
    '''
581
    Extract filename of given path os-indepently, taking care of known path
582
    separators.
583
584
    :param path: path
585
    :return: filename
586
    :rtype: str or unicode (depending on given path)
587
    '''
588
589
    for sep in common_path_separators:
590
        if sep in path:
591
            _, path = path.rsplit(sep, 1)
592
    return path
593
594
595
def clean_restricted_chars(path, restricted_chars=restricted_chars):
596
    '''
597
    Get path without restricted characters.
598
599
    :param path: path
600
    :return: path without restricted characters
601
    :rtype: str or unicode (depending on given path)
602
    '''
603
    for character in restricted_chars:
604
        path = path.replace(character, '_')
605
    return path
606
607
608
def check_forbidden_filename(filename,
609
                             destiny_os=os.name,
610
                             restricted_names=restricted_names):
611
    '''
612
    Get if given filename is forbidden for current OS or filesystem.
613
614
    :param filename:
615
    :param destiny_os: destination operative system
616
    :param fs_encoding: destination filesystem filename encoding
617
    :return: wether is forbidden on given OS (or filesystem) or not
618
    :rtype: bool
619
    '''
620
    if destiny_os == 'nt':
621
        fpc = filename.split('.', 1)[0].upper()
622
        if fpc in nt_device_names:
623
            return True
624
625
    return filename in restricted_names
626
627
628
def check_under_base(path, base, os_sep=os.sep):
629
    '''
630
    Check if given absolute path is under given base.
631
632
    :param path: absolute path
633
    :param base: absolute base path
634
    :return: wether file is under given base or not
635
    :rtype: bool
636
    '''
637
    prefix = base if base.endswith(os_sep) else base + os_sep
638
    return path == base or path.startswith(prefix)
639
640
641
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
642
    '''
643
    Get rid of parent path components and special filenames.
644
645
    If path is invalid or protected, return empty string.
646
647
    :param path: unsafe path
648
    :type: str
649
    :param destiny_os: destination operative system
650
    :type destiny_os: str
651
    :return: filename or empty string
652
    :rtype: str
653
    '''
654
    path = generic_filename(path)
655
    path = clean_restricted_chars(path)
656
657
    if check_forbidden_filename(path, destiny_os=destiny_os):
658
        return ''
659
660
    if isinstance(path, bytes):
661
        path = path.decode('latin-1', errors=underscore_replace)
662
663
    # Decode and recover from filesystem encoding in order to strip unwanted
664
    # characters out
665
    kwargs = dict(
666
        os_name=destiny_os,
667
        fs_encoding=fs_encoding,
668
        errors=underscore_replace
669
        )
670
    fs_encoded_path = compat.fsencode(path, **kwargs)
671
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
672
    return fs_decoded_path
673
674
675
def alternative_filename(filename, attempt=None):
676
    '''
677
    Generates an alternative version of given filename.
678
679
    If an number attempt parameter is given, will be used on the alternative
680
    name, a random value will be used otherwise.
681
682
    :param filename: original filename
683
    :param attempt: optional attempt number, defaults to null
684
    :return: new filename
685
    :rtype: str or unicode
686
    '''
687
    filename_parts = filename.rsplit(u'.', 2)
688
    name = filename_parts[0]
689
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
690
    if attempt is None:
691
        choose = random.choice
692
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
693
    else:
694
        extra = u' (%d)' % attempt
695
    return u'%s%s%s' % (name, extra, ext)
696