Completed
Push — dev-4.1 ( 3cf8c0...4d900b )
by Felipe A.
01:21
created

Directory.sortkey()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 12
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @property
52
    def default_action(self):
53
        for action in self.actions:
54
            if action.widget.place == 'link':
55
                return action
56
57
    @cached_property
58
    def actions(self):
59
        return self.plugin_manager.get_actions(self)
60
61
    @cached_property
62
    def can_remove(self):
63
        dirbase = self.app.config["directory_remove"]
64
        return dirbase and self.path.startswith(dirbase + os.sep)
65
66
    @cached_property
67
    def stats(self):
68
        return os.stat(self.path)
69
70
    @cached_property
71
    def parent(self):
72
        if self.path == self.app.config['directory_base']:
73
            return None
74
        parent = os.path.dirname(self.path) if self.path else None
75
        return self.directory_class(parent, self.app) if parent else None
76
77
    @cached_property
78
    def ancestors(self):
79
        ancestors = []
80
        parent = self.parent
81
        while parent:
82
            ancestors.append(parent)
83
            parent = parent.parent
84
        return ancestors
85
86
    @property
87
    def modified(self):
88
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
89
        return dt.strftime('%Y.%m.%d %H:%M:%S')
90
91
    @property
92
    def urlpath(self):
93
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
94
95
    @property
96
    def name(self):
97
        return os.path.basename(self.path)
98
99
    @property
100
    def type(self):
101
        return self.mimetype.split(";", 1)[0]
102
103
    def __init__(self, path=None, app=None, **defaults):
104
        self.path = compat.fsdecode(path) if path else None
105
        self.app = current_app if app is None else app
106
        self.__dict__.update(defaults)
107
108
    def remove(self):
109
        if not self.can_remove:
110
            raise OutsideRemovableBase("File outside removable base")
111
112
    @classmethod
113
    def from_urlpath(cls, path, app=None):
114
        '''
115
        Alternative constructor which accepts a path as taken from URL and uses
116
        the given app or the current app config to get the real path.
117
118
        If class has attribute `generic` set to True, `directory_class` or
119
        `file_class` will be used as type.
120
121
        :param path: relative path as from URL
122
        :param app: optional, flask application
123
        :return: file object pointing to path
124
        :rtype: File
125
        '''
126
        app = app or current_app
127
        base = app.config['directory_base']
128
        path = urlpath_to_abspath(path, base)
129
        if not cls.generic:
130
            kls = cls
131
        elif os.path.isdir(path):
132
            kls = cls.directory_class
133
        else:
134
            kls = cls.file_class
135
        return kls(path=path, app=app)
136
137
    @classmethod
138
    def register_file_class(cls, kls):
139
        cls.file_class = kls
140
        return kls
141
142
    @classmethod
143
    def register_directory_class(cls, kls):
144
        cls.directory_class = kls
145
        return kls
146
147
148
@Node.register_file_class
149
class File(Node):
150
    can_download = True
151
    can_upload = False
152
    is_directory = False
153
    generic = False
154
155
    @property
156
    def default_action(self):
157
        action = super(File, self).default_action
158
        if action is None:
159
            widget = self.plugin_manager.link_class.from_file(self)
160
            action = self.plugin_manager.action_class('open', widget)
161
        return action
162
163
    @cached_property
164
    def mimetype(self):
165
        return self.plugin_manager.get_mimetype(self.path)
166
167
    @cached_property
168
    def is_file(self):
169
        return os.path.isfile(self.path)
170
171
    @property
172
    def size(self):
173
        size, unit = fmt_size(
174
            self.stats.st_size,
175
            self.app.config["use_binary_multiples"]
176
            )
177
        if unit == binary_units[0]:
178
            return "%d %s" % (size, unit)
179
        return "%.2f %s" % (size, unit)
180
181
    @property
182
    def encoding(self):
183
        if ";" in self.mimetype:
184
            match = self.re_charset.search(self.mimetype)
185
            gdict = match.groupdict() if match else {}
186
            return gdict.get("charset") or "default"
187
        return "default"
188
189
    def remove(self):
190
        '''
191
        Remove file.
192
        :raises OutsideRemovableBase: when not under removable base directory
193
        '''
194
        super(File, self).remove()
195
        os.unlink(self.path)
196
197
    def download(self):
198
        '''
199
        Get a Flask's send_file Response object pointing to this file.
200
201
        :returns: Response object as returned by flask's send_file
202
        :rtype: flask.Response
203
        '''
204
        directory, name = os.path.split(self.path)
205
        return send_from_directory(directory, name, as_attachment=True)
206
207
208
@Node.register_directory_class
209
class Directory(Node):
210
    _listdir_cache = None
211
    mimetype = 'inode/directory'
212
    is_directory = True
213
    is_file = False
214
    size = 0
215
    encoding = 'default'
216
    generic = False
217
218
    @property
219
    def default_action(self):
220
        action = super(Directory, self).default_action
221
        if action is None:
222
            widget = self.plugin_manager.link_class.from_file(self)
223
            action = self.plugin_manager.action_class('browse', widget)
224
        return action
225
226
    @cached_property
227
    def can_download(self):
228
        return self.app.config['directory_downloadable']
229
230
    @cached_property
231
    def can_upload(self):
232
        dirbase = self.app.config["directory_upload"]
233
        return dirbase and (
234
            dirbase == self.path or
235
            self.path.startswith(dirbase + os.sep)
236
            )
237
238
    @cached_property
239
    def is_empty(self):
240
        if self._listdir_cache is not None:
241
            return bool(self._listdir_cache)
242
        for entry in self._listdir():
243
            return False
244
        return True
245
246
    def remove(self):
247
        '''
248
        Remove directory tree.
249
250
        :raises OutsideRemovableBase: when not under removable base directory
251
        '''
252
        super(Directory, self).remove()
253
        shutil.rmtree(self.path)
254
255
    def download(self):
256
        '''
257
        Get a Flask Response object streaming a tarball of this directory.
258
259
        :returns: Response object
260
        :rtype: flask.Response
261
        '''
262
        return self.app.response_class(
263
            TarFileStream(
264
                self.path,
265
                self.app.config["directory_tar_buffsize"]
266
                ),
267
            mimetype="application/octet-stream"
268
            )
269
270
    def contains(self, filename):
271
        '''
272
        Check if directory contains an entry with given filename.
273
274
        :param filename: filename will be check
275
        :type filename: str
276
        :returns: True if exists, False otherwise.
277
        :rtype: bool
278
        '''
279
        return os.path.exists(os.path.join(self.path, filename))
280
281
    def choose_filename(self, filename, attempts=999):
282
        '''
283
        Get a new filename which does not colide with any entry on directory,
284
        based on given filename.
285
286
        :param filename: base filename
287
        :type filename: str
288
        :param attempts: number of attempts, defaults to 999
289
        :type attempts: int
290
        :returns: filename
291
        :rtype: str
292
        '''
293
        new_filename = filename
294
        for attempt in range(2, attempts + 1):
295
            if not self.contains(new_filename):
296
                return new_filename
297
            new_filename = alternative_filename(filename, attempt)
298
        while self.contains(new_filename):
299
            new_filename = alternative_filename(filename)
300
        return new_filename
301
302
    def _listdir(self):
303
        '''
304
        Iter unsorted entries on this directory.
305
306
        :yields: Directory or File instance for each entry in directory
307
        :ytype: Node
308
        '''
309
        precomputed_stats = os.name == 'nt'
310
        for entry in compat.scandir(self.path):
311
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
312
            if precomputed_stats and not entry.is_symlink():
313
                kwargs['stats'] = entry.stats()
314
            if entry.is_dir(follow_symlinks=True):
315
                yield self.directory_class(**kwargs)
316
                continue
317
            yield self.file_class(**kwargs)
318
319
    def listdir(self, sortkey=None, reverse=False):
320
        '''
321
        Get sorted list (by `is_directory` and `name` properties) of File
322
        objects.
323
324
        :return: sorted list of File instances
325
        :rtype: list of File
326
        '''
327
        if self._listdir_cache is None:
328
            if sortkey:
329
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
330
            elif reverse:
331
                data = list(reversed(self._listdir()))
332
            else:
333
                data = list(self._listdir())
334
            self._listdir_cache = data
335
        return self._listdir_cache
336
337
338
class TarFileStream(object):
339
    '''
340
    Tarfile which compresses while reading for streaming.
341
342
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
343
    compression.
344
    '''
345
    event_class = threading.Event
346
    thread_class = threading.Thread
347
    tarfile_class = tarfile.open
348
349
    def __init__(self, path, buffsize=10240):
350
        self.path = path
351
        self.name = os.path.basename(path) + ".tgz"
352
353
        self._finished = 0
354
        self._want = 0
355
        self._data = bytes()
356
        self._add = self.event_class()
357
        self._result = self.event_class()
358
        self._tarfile = self.tarfile_class(  # stream write
359
            fileobj=self,
360
            mode="w|gz",
361
            bufsize=buffsize
362
            )
363
        self._th = self.thread_class(target=self.fill)
364
        self._th.start()
365
366
    def fill(self):
367
        self._tarfile.add(self.path, "")
368
        self._tarfile.close()  # force stream flush
369
        self._finished += 1
370
        if not self._result.is_set():
371
            self._result.set()
372
373
    def write(self, data):
374
        self._add.wait()
375
        self._data += data
376
        if len(self._data) > self._want:
377
            self._add.clear()
378
            self._result.set()
379
        return len(data)
380
381
    def read(self, want=0):
382
        if self._finished:
383
            if self._finished == 1:
384
                self._finished += 1
385
                return ""
386
            return EOFError("EOF reached")
387
388
        # Thread communication
389
        self._want = want
390
        self._add.set()
391
        self._result.wait()
392
        self._result.clear()
393
394
        if want:
395
            data = self._data[:want]
396
            self._data = self._data[want:]
397
        else:
398
            data = self._data
399
            self._data = bytes()
400
        return data
401
402
    def __iter__(self):
403
        data = self.read()
404
        while data:
405
            yield data
406
            data = self.read()
407
408
409
class OutsideDirectoryBase(Exception):
410
    pass
411
412
413
class OutsideRemovableBase(Exception):
414
    pass
415
416
417
def fmt_size(size, binary=True):
418
    '''
419
    Get size and unit.
420
421
    :param size: size in bytes
422
    :param binary: whether use binary or standard units, defaults to True
423
    :return: size and unit
424
    :rtype: tuple of int and unit as str
425
    '''
426
    if binary:
427
        fmt_sizes = binary_units
428
        fmt_divider = 1024.
429
    else:
430
        fmt_sizes = standard_units
431
        fmt_divider = 1000.
432
    for fmt in fmt_sizes[:-1]:
433
        if size < 1000:
434
            return (size, fmt)
435
        size /= fmt_divider
436
    return size, fmt_sizes[-1]
437
438
439
def relativize_path(path, base, os_sep=os.sep):
440
    '''
441
    Make absolute path relative to an absolute base.
442
443
    :param path: absolute path
444
    :param base: absolute base path
445
    :param os_sep: path component separator, defaults to current OS separator
446
    :return: relative path
447
    :rtype: str or unicode
448
    :raises OutsideDirectoryBase: if path is not below base
449
    '''
450
    if not check_under_base(path, base, os_sep):
451
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
452
    prefix_len = len(base)
453
    if not base.endswith(os_sep):
454
        prefix_len += len(os_sep)
455
    return path[prefix_len:]
456
457
458
def abspath_to_urlpath(path, base, os_sep=os.sep):
459
    '''
460
    Make filesystem absolute path uri relative using given absolute base path.
461
462
    :param path: absolute path
463
    :param base: absolute base path
464
    :param os_sep: path component separator, defaults to current OS separator
465
    :return: relative uri
466
    :rtype: str or unicode
467
    :raises OutsideDirectoryBase: if resulting path is not below base
468
    '''
469
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
470
471
472
def urlpath_to_abspath(path, base, os_sep=os.sep):
473
    '''
474
    Make uri relative path fs absolute using a given absolute base path.
475
476
    :param path: relative path
477
    :param base: absolute base path
478
    :param os_sep: path component separator, defaults to current OS separator
479
    :return: absolute path
480
    :rtype: str or unicode
481
    :raises OutsideDirectoryBase: if resulting path is not below base
482
    '''
483
    prefix = base if base.endswith(os_sep) else base + os_sep
484
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
485
    if base == realpath or realpath.startswith(prefix):
486
        return realpath
487
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
488
489
490
def generic_filename(path):
491
    '''
492
    Extract filename of given path os-indepently, taking care of known path
493
    separators.
494
495
    :param path: path
496
    :return: filename
497
    :rtype: str or unicode (depending on given path)
498
    '''
499
500
    for sep in common_path_separators:
501
        if sep in path:
502
            _, path = path.rsplit(sep, 1)
503
    return path
504
505
506
def clean_restricted_chars(path, restricted_chars=restricted_chars):
507
    '''
508
    Get path without restricted characters.
509
510
    :param path: path
511
    :return: path without restricted characters
512
    :rtype: str or unicode (depending on given path)
513
    '''
514
    for character in restricted_chars:
515
        path = path.replace(character, '_')
516
    return path
517
518
519
def check_forbidden_filename(filename,
520
                             destiny_os=os.name,
521
                             restricted_names=restricted_names):
522
    '''
523
    Get if given filename is forbidden for current OS or filesystem.
524
525
    :param filename:
526
    :param destiny_os: destination operative system
527
    :param fs_encoding: destination filesystem filename encoding
528
    :return: wether is forbidden on given OS (or filesystem) or not
529
    :rtype: bool
530
    '''
531
    if destiny_os == 'nt':
532
        fpc = filename.split('.', 1)[0].upper()
533
        if fpc in nt_device_names:
534
            return True
535
536
    return filename in restricted_names
537
538
539
def check_under_base(path, base, os_sep=os.sep):
540
    '''
541
    Check if given absolute path is under given base.
542
543
    :param path: absolute path
544
    :param base: absolute base path
545
    :return: wether file is under given base or not
546
    :rtype: bool
547
    '''
548
    prefix = base if base.endswith(os_sep) else base + os_sep
549
    return path == base or path.startswith(prefix)
550
551
552
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
553
    '''
554
    Get rid of parent path components and special filenames.
555
556
    If path is invalid or protected, return empty string.
557
558
    :param path: unsafe path
559
    :type: str
560
    :param destiny_os: destination operative system
561
    :type destiny_os: str
562
    :return: filename or empty string
563
    :rtype: str
564
    '''
565
    path = generic_filename(path)
566
    path = clean_restricted_chars(path)
567
568
    if check_forbidden_filename(path, destiny_os=destiny_os):
569
        return ''
570
571
    if isinstance(path, bytes):
572
        path = path.decode('latin-1', errors=underscore_replace)
573
574
    # Decode and recover from filesystem encoding in order to strip unwanted
575
    # characters out
576
    kwargs = dict(
577
        os_name=destiny_os,
578
        fs_encoding=fs_encoding,
579
        errors=underscore_replace
580
        )
581
    fs_encoded_path = compat.fsencode(path, **kwargs)
582
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
583
    return fs_decoded_path
584
585
586
def alternative_filename(filename, attempt=None):
587
    '''
588
    Generates an alternative version of given filename.
589
590
    If an number attempt parameter is given, will be used on the alternative
591
    name, a random value will be used otherwise.
592
593
    :param filename: original filename
594
    :param attempt: optional attempt number, defaults to null
595
    :return: new filename
596
    :rtype: str or unicode
597
    '''
598
    filename_parts = filename.rsplit(u'.', 2)
599
    name = filename_parts[0]
600
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
601
    if attempt is None:
602
        choose = random.choice
603
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
604
    else:
605
        extra = u' (%d)' % attempt
606
    return u'%s%s%s' % (name, extra, ext)
607