Completed
Push — dev-4.1 ( d7c5dd...136355 )
by Felipe A.
02:06 queued 46s
created

Directory.is_directory()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @property
52
    def default_action(self):
53
        for action in self.actions:
54
            if action.widget.place == 'link':
55
                return action
56
57
    @cached_property
58
    def actions(self):
59
        return self.plugin_manager.get_actions(self)
60
61
    @cached_property
62
    def can_remove(self):
63
        dirbase = self.app.config["directory_remove"]
64
        return dirbase and self.path.startswith(dirbase + os.sep)
65
66
    @cached_property
67
    def stats(self):
68
        return os.stat(self.path)
69
70
    @cached_property
71
    def parent(self):
72
        if self.path == self.app.config['directory_base']:
73
            return None
74
        parent = os.path.dirname(self.path) if self.path else None
75
        return self.directory_class(parent, self.app) if parent else None
76
77
    @cached_property
78
    def ancestors(self):
79
        ancestors = []
80
        parent = self.parent
81
        while parent:
82
            ancestors.append(parent)
83
            parent = parent.parent
84
        return ancestors
85
86
    @property
87
    def modified(self):
88
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
89
        return dt.strftime('%Y.%m.%d %H:%M:%S')
90
91
    @property
92
    def urlpath(self):
93
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
94
95
    @property
96
    def name(self):
97
        return os.path.basename(self.path)
98
99
    @property
100
    def type(self):
101
        return self.mimetype.split(";", 1)[0]
102
103
    def __init__(self, path=None, app=None, **defaults):
104
        self.path = compat.fsdecode(path) if path else None
105
        self.app = current_app if app is None else app
106
        self.__dict__.update(defaults)
107
108
    def remove(self):
109
        if not self.can_remove:
110
            raise OutsideRemovableBase("File outside removable base")
111
112
    @classmethod
113
    def from_urlpath(cls, path, app=None):
114
        '''
115
        Alternative constructor which accepts a path as taken from URL and uses
116
        the given app or the current app config to get the real path.
117
118
        If class has attribute `generic` set to True, `directory_class` or
119
        `file_class` will be used as type.
120
121
        :param path: relative path as from URL
122
        :param app: optional, flask application
123
        :return: file object pointing to path
124
        :rtype: File
125
        '''
126
        app = app or current_app
127
        base = app.config['directory_base']
128
        path = urlpath_to_abspath(path, base)
129
        if not cls.generic:
130
            kls = cls
131
        elif os.path.isdir(path):
132
            kls = cls.directory_class
133
        else:
134
            kls = cls.file_class
135
        return kls(path=path, app=app)
136
137
    @classmethod
138
    def register_file_class(cls, kls):
139
        cls.file_class = kls
140
        return kls
141
142
    @classmethod
143
    def register_directory_class(cls, kls):
144
        cls.directory_class = kls
145
        return kls
146
147
148
@Node.register_file_class
149
class File(Node):
150
    can_download = True
151
    can_upload = False
152
    is_directory = False
153
    generic = False
154
155
    @property
156
    def default_action(self):
157
        action = super(File, self).default_action
158
        if action is None:
159
            widget = self.plugin_manager.link_class.from_file(self)
160
            action = self.plugin_manager.action_class('open', widget)
161
        return action
162
163
    @cached_property
164
    def mimetype(self):
165
        return self.plugin_manager.get_mimetype(self.path)
166
167
    @cached_property
168
    def is_file(self):
169
        return os.path.isfile(self.path)
170
171
    @property
172
    def size(self):
173
        size, unit = fmt_size(
174
            self.stats.st_size,
175
            self.app.config["use_binary_multiples"]
176
            )
177
        if unit == binary_units[0]:
178
            return "%d %s" % (size, unit)
179
        return "%.2f %s" % (size, unit)
180
181
    @property
182
    def encoding(self):
183
        if ";" in self.mimetype:
184
            match = self.re_charset.search(self.mimetype)
185
            gdict = match.groupdict() if match else {}
186
            return gdict.get("charset") or "default"
187
        return "default"
188
189
    def remove(self):
190
        '''
191
        Remove file.
192
        :raises OutsideRemovableBase: when not under removable base directory
193
        '''
194
        super(File, self).remove()
195
        os.unlink(self.path)
196
197
    def download(self):
198
        '''
199
        Get a Flask's send_file Response object pointing to this file.
200
201
        :returns: Response object as returned by flask's send_file
202
        :rtype: flask.Response
203
        '''
204
        directory, name = os.path.split(self.path)
205
        return send_from_directory(directory, name, as_attachment=True)
206
207
208
@Node.register_directory_class
209
class Directory(Node):
210
    _listdir_cache = None
211
    mimetype = 'inode/directory'
212
    is_file = False
213
    size = 0
214
    encoding = 'default'
215
    generic = False
216
217
    @property
218
    def default_action(self):
219
        action = super(Directory, self).default_action
220
        if action is None:
221
            widget = self.plugin_manager.link_class.from_file(self)
222
            action = self.plugin_manager.action_class('browse', widget)
223
        return action
224
225
    @cached_property
226
    def is_directory(self):
227
        return os.path.isdir(self.path)
228
229
    @cached_property
230
    def can_download(self):
231
        return self.app.config['directory_downloadable']
232
233
    @cached_property
234
    def can_upload(self):
235
        dirbase = self.app.config["directory_upload"]
236
        return dirbase and (
237
            dirbase == self.path or
238
            self.path.startswith(dirbase + os.sep)
239
            )
240
241
    @cached_property
242
    def is_empty(self):
243
        if self._listdir_cache is not None:
244
            return bool(self._listdir_cache)
245
        for entry in self._listdir():
246
            return False
247
        return True
248
249
    def remove(self):
250
        '''
251
        Remove directory tree.
252
253
        :raises OutsideRemovableBase: when not under removable base directory
254
        '''
255
        super(Directory, self).remove()
256
        shutil.rmtree(self.path)
257
258
    def download(self):
259
        '''
260
        Get a Flask Response object streaming a tarball of this directory.
261
262
        :returns: Response object
263
        :rtype: flask.Response
264
        '''
265
        return self.app.response_class(
266
            TarFileStream(
267
                self.path,
268
                self.app.config["directory_tar_buffsize"]
269
                ),
270
            mimetype="application/octet-stream"
271
            )
272
273
    def contains(self, filename):
274
        '''
275
        Check if directory contains an entry with given filename.
276
277
        :param filename: filename will be check
278
        :type filename: str
279
        :returns: True if exists, False otherwise.
280
        :rtype: bool
281
        '''
282
        return os.path.exists(os.path.join(self.path, filename))
283
284
    def choose_filename(self, filename, attempts=999):
285
        '''
286
        Get a new filename which does not colide with any entry on directory,
287
        based on given filename.
288
289
        :param filename: base filename
290
        :type filename: str
291
        :param attempts: number of attempts, defaults to 999
292
        :type attempts: int
293
        :returns: filename
294
        :rtype: str
295
        '''
296
        new_filename = filename
297
        for attempt in range(2, attempts + 1):
298
            if not self.contains(new_filename):
299
                return new_filename
300
            new_filename = alternative_filename(filename, attempt)
301
        while self.contains(new_filename):
302
            new_filename = alternative_filename(filename)
303
        return new_filename
304
305
    def _listdir(self):
306
        '''
307
        Iter unsorted entries on this directory.
308
309
        :yields: Directory or File instance for each entry in directory
310
        :ytype: Node
311
        '''
312
        precomputed_stats = os.name == 'nt'
313
        for entry in compat.scandir(self.path):
314
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
315
            if precomputed_stats and not entry.is_symlink():
316
                kwargs['stats'] = entry.stats()
317
            if entry.is_dir(follow_symlinks=True):
318
                yield self.directory_class(**kwargs)
319
                continue
320
            yield self.file_class(**kwargs)
321
322
    def listdir(self, sortkey=None, reverse=False):
323
        '''
324
        Get sorted list (by `is_directory` and `name` properties) of File
325
        objects.
326
327
        :return: sorted list of File instances
328
        :rtype: list of File
329
        '''
330
        if self._listdir_cache is None:
331
            if sortkey:
332
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
333
            elif reverse:
334
                data = list(reversed(self._listdir()))
335
            else:
336
                data = list(self._listdir())
337
            self._listdir_cache = data
338
        return self._listdir_cache
339
340
341
class TarFileStream(object):
342
    '''
343
    Tarfile which compresses while reading for streaming.
344
345
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
346
    compression.
347
    '''
348
    event_class = threading.Event
349
    thread_class = threading.Thread
350
    tarfile_class = tarfile.open
351
352
    def __init__(self, path, buffsize=10240):
353
        self.path = path
354
        self.name = os.path.basename(path) + ".tgz"
355
356
        self._finished = 0
357
        self._want = 0
358
        self._data = bytes()
359
        self._add = self.event_class()
360
        self._result = self.event_class()
361
        self._tarfile = self.tarfile_class(  # stream write
362
            fileobj=self,
363
            mode="w|gz",
364
            bufsize=buffsize
365
            )
366
        self._th = self.thread_class(target=self.fill)
367
        self._th.start()
368
369
    def fill(self):
370
        self._tarfile.add(self.path, "")
371
        self._tarfile.close()  # force stream flush
372
        self._finished += 1
373
        if not self._result.is_set():
374
            self._result.set()
375
376
    def write(self, data):
377
        self._add.wait()
378
        self._data += data
379
        if len(self._data) > self._want:
380
            self._add.clear()
381
            self._result.set()
382
        return len(data)
383
384
    def read(self, want=0):
385
        if self._finished:
386
            if self._finished == 1:
387
                self._finished += 1
388
                return ""
389
            return EOFError("EOF reached")
390
391
        # Thread communication
392
        self._want = want
393
        self._add.set()
394
        self._result.wait()
395
        self._result.clear()
396
397
        if want:
398
            data = self._data[:want]
399
            self._data = self._data[want:]
400
        else:
401
            data = self._data
402
            self._data = bytes()
403
        return data
404
405
    def __iter__(self):
406
        data = self.read()
407
        while data:
408
            yield data
409
            data = self.read()
410
411
412
class OutsideDirectoryBase(Exception):
413
    pass
414
415
416
class OutsideRemovableBase(Exception):
417
    pass
418
419
420
def fmt_size(size, binary=True):
421
    '''
422
    Get size and unit.
423
424
    :param size: size in bytes
425
    :param binary: whether use binary or standard units, defaults to True
426
    :return: size and unit
427
    :rtype: tuple of int and unit as str
428
    '''
429
    if binary:
430
        fmt_sizes = binary_units
431
        fmt_divider = 1024.
432
    else:
433
        fmt_sizes = standard_units
434
        fmt_divider = 1000.
435
    for fmt in fmt_sizes[:-1]:
436
        if size < 1000:
437
            return (size, fmt)
438
        size /= fmt_divider
439
    return size, fmt_sizes[-1]
440
441
442
def relativize_path(path, base, os_sep=os.sep):
443
    '''
444
    Make absolute path relative to an absolute base.
445
446
    :param path: absolute path
447
    :param base: absolute base path
448
    :param os_sep: path component separator, defaults to current OS separator
449
    :return: relative path
450
    :rtype: str or unicode
451
    :raises OutsideDirectoryBase: if path is not below base
452
    '''
453
    if not check_under_base(path, base, os_sep):
454
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
455
    prefix_len = len(base)
456
    if not base.endswith(os_sep):
457
        prefix_len += len(os_sep)
458
    return path[prefix_len:]
459
460
461
def abspath_to_urlpath(path, base, os_sep=os.sep):
462
    '''
463
    Make filesystem absolute path uri relative using given absolute base path.
464
465
    :param path: absolute path
466
    :param base: absolute base path
467
    :param os_sep: path component separator, defaults to current OS separator
468
    :return: relative uri
469
    :rtype: str or unicode
470
    :raises OutsideDirectoryBase: if resulting path is not below base
471
    '''
472
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
473
474
475
def urlpath_to_abspath(path, base, os_sep=os.sep):
476
    '''
477
    Make uri relative path fs absolute using a given absolute base path.
478
479
    :param path: relative path
480
    :param base: absolute base path
481
    :param os_sep: path component separator, defaults to current OS separator
482
    :return: absolute path
483
    :rtype: str or unicode
484
    :raises OutsideDirectoryBase: if resulting path is not below base
485
    '''
486
    prefix = base if base.endswith(os_sep) else base + os_sep
487
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
488
    if base == realpath or realpath.startswith(prefix):
489
        return realpath
490
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
491
492
493
def generic_filename(path):
494
    '''
495
    Extract filename of given path os-indepently, taking care of known path
496
    separators.
497
498
    :param path: path
499
    :return: filename
500
    :rtype: str or unicode (depending on given path)
501
    '''
502
503
    for sep in common_path_separators:
504
        if sep in path:
505
            _, path = path.rsplit(sep, 1)
506
    return path
507
508
509
def clean_restricted_chars(path, restricted_chars=restricted_chars):
510
    '''
511
    Get path without restricted characters.
512
513
    :param path: path
514
    :return: path without restricted characters
515
    :rtype: str or unicode (depending on given path)
516
    '''
517
    for character in restricted_chars:
518
        path = path.replace(character, '_')
519
    return path
520
521
522
def check_forbidden_filename(filename,
523
                             destiny_os=os.name,
524
                             restricted_names=restricted_names):
525
    '''
526
    Get if given filename is forbidden for current OS or filesystem.
527
528
    :param filename:
529
    :param destiny_os: destination operative system
530
    :param fs_encoding: destination filesystem filename encoding
531
    :return: wether is forbidden on given OS (or filesystem) or not
532
    :rtype: bool
533
    '''
534
    if destiny_os == 'nt':
535
        fpc = filename.split('.', 1)[0].upper()
536
        if fpc in nt_device_names:
537
            return True
538
539
    return filename in restricted_names
540
541
542
def check_under_base(path, base, os_sep=os.sep):
543
    '''
544
    Check if given absolute path is under given base.
545
546
    :param path: absolute path
547
    :param base: absolute base path
548
    :return: wether file is under given base or not
549
    :rtype: bool
550
    '''
551
    prefix = base if base.endswith(os_sep) else base + os_sep
552
    return path == base or path.startswith(prefix)
553
554
555
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
556
    '''
557
    Get rid of parent path components and special filenames.
558
559
    If path is invalid or protected, return empty string.
560
561
    :param path: unsafe path
562
    :type: str
563
    :param destiny_os: destination operative system
564
    :type destiny_os: str
565
    :return: filename or empty string
566
    :rtype: str
567
    '''
568
    path = generic_filename(path)
569
    path = clean_restricted_chars(path)
570
571
    if check_forbidden_filename(path, destiny_os=destiny_os):
572
        return ''
573
574
    if isinstance(path, bytes):
575
        path = path.decode('latin-1', errors=underscore_replace)
576
577
    # Decode and recover from filesystem encoding in order to strip unwanted
578
    # characters out
579
    kwargs = dict(
580
        os_name=destiny_os,
581
        fs_encoding=fs_encoding,
582
        errors=underscore_replace
583
        )
584
    fs_encoded_path = compat.fsencode(path, **kwargs)
585
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
586
    return fs_decoded_path
587
588
589
def alternative_filename(filename, attempt=None):
590
    '''
591
    Generates an alternative version of given filename.
592
593
    If an number attempt parameter is given, will be used on the alternative
594
    name, a random value will be used otherwise.
595
596
    :param filename: original filename
597
    :param attempt: optional attempt number, defaults to null
598
    :return: new filename
599
    :rtype: str or unicode
600
    '''
601
    filename_parts = filename.rsplit(u'.', 2)
602
    name = filename_parts[0]
603
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
604
    if attempt is None:
605
        choose = random.choice
606
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
607
    else:
608
        extra = u' (%d)' % attempt
609
    return u'%s%s%s' % (name, extra, ext)
610