Completed
Push — dev-4.1 ( c25dff...77a456 )
by Felipe A.
01:14
created

Directory.sortkey()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 12
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @property
52
    def default_action(self):
53
        for action in self.actions:
54
            if action.widget.place == 'link':
55
                return action
56
57
    @cached_property
58
    def actions(self):
59
        return self.plugin_manager.get_actions(self)
60
61
    @cached_property
62
    def can_remove(self):
63
        dirbase = self.app.config["directory_remove"]
64
        return dirbase and self.path.startswith(dirbase + os.sep)
65
66
    @cached_property
67
    def stats(self):
68
        return os.stat(self.path)
69
70
    @cached_property
71
    def parent(self):
72
        if self.path == self.app.config['directory_base']:
73
            return None
74
        parent = os.path.dirname(self.path) if self.path else None
75
        return self.directory_class(parent, self.app) if parent else None
76
77
    @cached_property
78
    def ancestors(self):
79
        ancestors = []
80
        parent = self.parent
81
        while parent:
82
            ancestors.append(parent)
83
            parent = parent.parent
84
        return ancestors
85
86
    @property
87
    def modified(self):
88
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
89
        return dt.strftime('%Y.%m.%d %H:%M:%S')
90
91
    @property
92
    def urlpath(self):
93
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
94
95
    @property
96
    def name(self):
97
        return os.path.basename(self.path)
98
99
    @property
100
    def type(self):
101
        return self.mimetype.split(";", 1)[0]
102
103
    def __init__(self, path=None, app=None, **defaults):
104
        self.path = compat.fsdecode(path) if path else None
105
        self.app = current_app if app is None else app
106
        self.__dict__.update(defaults)
107
108
    def remove(self):
109
        if not self.can_remove:
110
            raise OutsideRemovableBase("File outside removable base")
111
112
    @classmethod
113
    def from_urlpath(cls, path, app=None):
114
        '''
115
        Alternative constructor which accepts a path as taken from URL and uses
116
        the given app or the current app config to get the real path.
117
118
        If class has attribute `generic` set to True, `directory_class` or
119
        `file_class` will be used as type.
120
121
        :param path: relative path as from URL
122
        :param app: optional, flask application
123
        :return: file object pointing to path
124
        :rtype: File
125
        '''
126
        app = app or current_app
127
        base = app.config['directory_base']
128
        path = urlpath_to_abspath(path, base)
129
        if not cls.generic:
130
            kls = cls
131
        elif os.path.isdir(path):
132
            kls = cls.directory_class
133
        else:
134
            kls = cls.file_class
135
        return kls(path=path, app=app)
136
137
    @classmethod
138
    def register_file_class(cls, kls):
139
        cls.file_class = kls
140
        return kls
141
142
    @classmethod
143
    def register_directory_class(cls, kls):
144
        cls.directory_class = kls
145
        return kls
146
147
148
@Node.register_file_class
149
class File(Node):
150
    can_download = True
151
    can_upload = False
152
    is_directory = False
153
    generic = False
154
155
    @property
156
    def default_action(self):
157
        action = super(File, self).default_action
158
        if action is None:
159
            widget = self.plugin_manager.link_class.from_file(self)
160
            action = self.plugin_manager.action_class('open', widget)
161
        return action
162
163
    @cached_property
164
    def mimetype(self):
165
        return self.plugin_manager.get_mimetype(self.path)
166
167
    @cached_property
168
    def is_file(self):
169
        return os.path.isfile(self.path)
170
171
    @property
172
    def size(self):
173
        size, unit = fmt_size(
174
            self.stats.st_size,
175
            self.app.config["use_binary_multiples"]
176
            )
177
        if unit == binary_units[0]:
178
            return "%d %s" % (size, unit)
179
        return "%.2f %s" % (size, unit)
180
181
    @property
182
    def encoding(self):
183
        if ";" in self.mimetype:
184
            match = self.re_charset.search(self.mimetype)
185
            gdict = match.groupdict() if match else {}
186
            return gdict.get("charset") or "default"
187
        return "default"
188
189
    def remove(self):
190
        '''
191
        Remove file.
192
        :raises OutsideRemovableBase: when not under removable base directory
193
        '''
194
        super(File, self).remove()
195
        os.unlink(self.path)
196
197
    def download(self):
198
        '''
199
        Get a Flask's send_file Response object pointing to this file.
200
201
        :returns: Response object as returned by flask's send_file
202
        :rtype: flask.Response
203
        '''
204
        directory, name = os.path.split(self.path)
205
        return send_from_directory(directory, name, as_attachment=True)
206
207
208
@Node.register_directory_class
209
class Directory(Node):
210
    _listdir_cache = None
211
    mimetype = 'inode/directory'
212
    is_directory = True
213
    is_file = False
214
    size = 0
215
    encoding = 'default'
216
    generic = False
217
218
    @property
219
    def default_action(self):
220
        action = super(Directory, self).default_action
221
        if action is None:
222
            widget = self.plugin_manager.link_class.from_file(self)
223
            action = self.plugin_manager.action_class('browse', widget)
224
        return action
225
226
    @cached_property
227
    def can_download(self):
228
        return self.app.config['directory_downloadable']
229
230
    @cached_property
231
    def can_upload(self):
232
        dirbase = self.app.config["directory_upload"]
233
        return dirbase and (
234
            dirbase == self.path or
235
            self.path.startswith(dirbase + os.sep)
236
            )
237
238
    @cached_property
239
    def is_empty(self):
240
        if self._listdir_cache is not None:
241
            return bool(self._listdir_cache)
242
        for entry in self._listdir():
243
            return False
244
        return True
245
246
    def remove(self):
247
        '''
248
        Remove directory tree.
249
250
        :raises OutsideRemovableBase: when not under removable base directory
251
        '''
252
        super(Directory, self).remove()
253
        shutil.rmtree(self.path)
254
255
    def download(self):
256
        '''
257
        Get a Flask Response object streaming a tarball of this directory.
258
259
        :returns: Response object
260
        :rtype: flask.Response
261
        '''
262
        return self.app.response_class(
263
            TarFileStream(
264
                self.path,
265
                self.app.config["directory_tar_buffsize"]
266
                ),
267
            mimetype="application/octet-stream"
268
            )
269
270
    def contains(self, filename):
271
        '''
272
        Check if directory contains an entry with given filename.
273
274
        :param filename: filename will be check
275
        :type filename: str
276
        :returns: True if exists, False otherwise.
277
        :rtype: bool
278
        '''
279
        return os.path.exists(os.path.join(self.path, filename))
280
281
    def choose_filename(self, filename, attempts=999):
282
        '''
283
        Get a new filename which does not colide with any entry on directory,
284
        based on given filename.
285
286
        :param filename: base filename
287
        :type filename: str
288
        :param attempts: number of attempts, defaults to 999
289
        :type attempts: int
290
        :returns: filename
291
        :rtype: str
292
        '''
293
        new_filename = filename
294
        for attempt in range(2, attempts + 1):
295
            if not self.contains(new_filename):
296
                return new_filename
297
            new_filename = alternative_filename(filename, attempt)
298
        while self.contains(new_filename):
299
            new_filename = alternative_filename(filename)
300
        return new_filename
301
302
    def _listdir(self):
303
        '''
304
        Iter unsorted entries on this directory.
305
306
        :yields: Directory or File instance for each entry in directory
307
        :ytype: Node
308
        '''
309
        precomputed_stats = os.name == 'nt'
310
        for entry in compat.scandir(self.path):
311
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
312
            if precomputed_stats and not entry.is_symlink():
313
                kwargs['stats'] = entry.stats()
314
            if entry.is_dir(follow_symlinks=True):
315
                yield self.directory_class(**kwargs)
316
                continue
317
            yield self.file_class(**kwargs)
318
319
    def sortkey(self, node):
320
        '''
321
        Order key function used by `listdir`.
322
323
        Could be overloaded to None by inherited classes.
324
325
        :param node: node to order
326
        :type node: Node
327
        :returns: tuple of not-directory and lowercase name
328
        :rtype: tuple of bool and str
329
        '''
330
        return not node.is_directory, node.name.lower()
331
332
    def listdir(self):
333
        '''
334
        Get sorted list (by `is_directory` and `name` properties) of File
335
        objects.
336
337
        :return: sorted list of File instances
338
        :rtype: list of File
339
        '''
340
        if self._listdir_cache is None:
341
            if self.sortkey:
342
                self._listdir_cache = sorted(self._listdir(), key=self.sortkey)
343
            else:
344
                self._listdir_cache = list(self._listdir())
345
        return self._listdir_cache
346
347
348
class TarFileStream(object):
349
    '''
350
    Tarfile which compresses while reading for streaming.
351
352
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
353
    compression.
354
    '''
355
    event_class = threading.Event
356
    thread_class = threading.Thread
357
    tarfile_class = tarfile.open
358
359
    def __init__(self, path, buffsize=10240):
360
        self.path = path
361
        self.name = os.path.basename(path) + ".tgz"
362
363
        self._finished = 0
364
        self._want = 0
365
        self._data = bytes()
366
        self._add = self.event_class()
367
        self._result = self.event_class()
368
        self._tarfile = self.tarfile_class(  # stream write
369
            fileobj=self,
370
            mode="w|gz",
371
            bufsize=buffsize
372
            )
373
        self._th = self.thread_class(target=self.fill)
374
        self._th.start()
375
376
    def fill(self):
377
        self._tarfile.add(self.path, "")
378
        self._tarfile.close()  # force stream flush
379
        self._finished += 1
380
        if not self._result.is_set():
381
            self._result.set()
382
383
    def write(self, data):
384
        self._add.wait()
385
        self._data += data
386
        if len(self._data) > self._want:
387
            self._add.clear()
388
            self._result.set()
389
        return len(data)
390
391
    def read(self, want=0):
392
        if self._finished:
393
            if self._finished == 1:
394
                self._finished += 1
395
                return ""
396
            return EOFError("EOF reached")
397
398
        # Thread communication
399
        self._want = want
400
        self._add.set()
401
        self._result.wait()
402
        self._result.clear()
403
404
        if want:
405
            data = self._data[:want]
406
            self._data = self._data[want:]
407
        else:
408
            data = self._data
409
            self._data = bytes()
410
        return data
411
412
    def __iter__(self):
413
        data = self.read()
414
        while data:
415
            yield data
416
            data = self.read()
417
418
419
class OutsideDirectoryBase(Exception):
420
    pass
421
422
423
class OutsideRemovableBase(Exception):
424
    pass
425
426
427
def fmt_size(size, binary=True):
428
    '''
429
    Get size and unit.
430
431
    :param size: size in bytes
432
    :param binary: whether use binary or standard units, defaults to True
433
    :return: size and unit
434
    :rtype: tuple of int and unit as str
435
    '''
436
    if binary:
437
        fmt_sizes = binary_units
438
        fmt_divider = 1024.
439
    else:
440
        fmt_sizes = standard_units
441
        fmt_divider = 1000.
442
    for fmt in fmt_sizes[:-1]:
443
        if size < 1000:
444
            return (size, fmt)
445
        size /= fmt_divider
446
    return size, fmt_sizes[-1]
447
448
449
def relativize_path(path, base, os_sep=os.sep):
450
    '''
451
    Make absolute path relative to an absolute base.
452
453
    :param path: absolute path
454
    :param base: absolute base path
455
    :param os_sep: path component separator, defaults to current OS separator
456
    :return: relative path
457
    :rtype: str or unicode
458
    :raises OutsideDirectoryBase: if path is not below base
459
    '''
460
    if not check_under_base(path, base, os_sep):
461
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
462
    prefix_len = len(base)
463
    if not base.endswith(os_sep):
464
        prefix_len += len(os_sep)
465
    return path[prefix_len:]
466
467
468
def abspath_to_urlpath(path, base, os_sep=os.sep):
469
    '''
470
    Make filesystem absolute path uri relative using given absolute base path.
471
472
    :param path: absolute path
473
    :param base: absolute base path
474
    :param os_sep: path component separator, defaults to current OS separator
475
    :return: relative uri
476
    :rtype: str or unicode
477
    :raises OutsideDirectoryBase: if resulting path is not below base
478
    '''
479
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
480
481
482
def urlpath_to_abspath(path, base, os_sep=os.sep):
483
    '''
484
    Make uri relative path fs absolute using a given absolute base path.
485
486
    :param path: relative path
487
    :param base: absolute base path
488
    :param os_sep: path component separator, defaults to current OS separator
489
    :return: absolute path
490
    :rtype: str or unicode
491
    :raises OutsideDirectoryBase: if resulting path is not below base
492
    '''
493
    prefix = base if base.endswith(os_sep) else base + os_sep
494
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
495
    if base == realpath or realpath.startswith(prefix):
496
        return realpath
497
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
498
499
500
def generic_filename(path):
501
    '''
502
    Extract filename of given path os-indepently, taking care of known path
503
    separators.
504
505
    :param path: path
506
    :return: filename
507
    :rtype: str or unicode (depending on given path)
508
    '''
509
510
    for sep in common_path_separators:
511
        if sep in path:
512
            _, path = path.rsplit(sep, 1)
513
    return path
514
515
516
def clean_restricted_chars(path, restricted_chars=restricted_chars):
517
    '''
518
    Get path without restricted characters.
519
520
    :param path: path
521
    :return: path without restricted characters
522
    :rtype: str or unicode (depending on given path)
523
    '''
524
    for character in restricted_chars:
525
        path = path.replace(character, '_')
526
    return path
527
528
529
def check_forbidden_filename(filename,
530
                             destiny_os=os.name,
531
                             restricted_names=restricted_names):
532
    '''
533
    Get if given filename is forbidden for current OS or filesystem.
534
535
    :param filename:
536
    :param destiny_os: destination operative system
537
    :param fs_encoding: destination filesystem filename encoding
538
    :return: wether is forbidden on given OS (or filesystem) or not
539
    :rtype: bool
540
    '''
541
    if destiny_os == 'nt':
542
        fpc = filename.split('.', 1)[0].upper()
543
        if fpc in nt_device_names:
544
            return True
545
546
    return filename in restricted_names
547
548
549
def check_under_base(path, base, os_sep=os.sep):
550
    '''
551
    Check if given absolute path is under given base.
552
553
    :param path: absolute path
554
    :param base: absolute base path
555
    :return: wether file is under given base or not
556
    :rtype: bool
557
    '''
558
    prefix = base if base.endswith(os_sep) else base + os_sep
559
    return path == base or path.startswith(prefix)
560
561
562
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
563
    '''
564
    Get rid of parent path components and special filenames.
565
566
    If path is invalid or protected, return empty string.
567
568
    :param path: unsafe path
569
    :type: str
570
    :param destiny_os: destination operative system
571
    :type destiny_os: str
572
    :return: filename or empty string
573
    :rtype: str
574
    '''
575
    path = generic_filename(path)
576
    path = clean_restricted_chars(path)
577
578
    if check_forbidden_filename(path, destiny_os=destiny_os):
579
        return ''
580
581
    if isinstance(path, bytes):
582
        path = path.decode('latin-1', errors=underscore_replace)
583
584
    # Decode and recover from filesystem encoding in order to strip unwanted
585
    # characters out
586
    kwargs = dict(
587
        os_name=destiny_os,
588
        fs_encoding=fs_encoding,
589
        errors=underscore_replace
590
        )
591
    fs_encoded_path = compat.fsencode(path, **kwargs)
592
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
593
    return fs_decoded_path
594
595
596
def alternative_filename(filename, attempt=None):
597
    '''
598
    Generates an alternative version of given filename.
599
600
    If an number attempt parameter is given, will be used on the alternative
601
    name, a random value will be used otherwise.
602
603
    :param filename: original filename
604
    :param attempt: optional attempt number, defaults to null
605
    :return: new filename
606
    :rtype: str or unicode
607
    '''
608
    filename_parts = filename.rsplit(u'.', 2)
609
    name = filename_parts[0]
610
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
611
    if attempt is None:
612
        choose = random.choice
613
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
614
    else:
615
        extra = u' (%d)' % attempt
616
    return u'%s%s%s' % (name, extra, ext)
617