Completed
Push — dev-4.1 ( dbd54d...c25dff )
by Felipe A.
01:42
created

Node.from_urlpath()   B

Complexity

Conditions 3

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 24
rs 8.9713
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    generic = True
41
    directory_class = None  # set later at import time
42
    file_class = None  # set later at import time
43
44
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
45
    can_download = False
46
47
    @property
48
    def plugin_manager(self):
49
        return self.app.extensions['plugin_manager']
50
51
    @property
52
    def default_action(self):
53
        for action in self.actions:
54
            if action.widget.place == 'link':
55
                return action
56
57
    @cached_property
58
    def actions(self):
59
        return self.plugin_manager.get_actions(self)
60
61
    @cached_property
62
    def can_remove(self):
63
        dirbase = self.app.config["directory_remove"]
64
        return dirbase and self.path.startswith(dirbase + os.sep)
65
66
    @cached_property
67
    def stats(self):
68
        return os.stat(self.path)
69
70
    @cached_property
71
    def parent(self):
72
        if self.path == self.app.config['directory_base']:
73
            return None
74
        return self.directory_class(os.path.dirname(self.path), self.app)
75
76
    @cached_property
77
    def ancestors(self):
78
        ancestors = []
79
        parent = self.parent
80
        while parent:
81
            ancestors.append(parent)
82
            parent = parent.parent
83
        return ancestors
84
85
    @property
86
    def modified(self):
87
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
88
        return dt.strftime('%Y.%m.%d %H:%M:%S')
89
90
    @property
91
    def urlpath(self):
92
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
93
94
    @property
95
    def name(self):
96
        return os.path.basename(self.path)
97
98
    @property
99
    def type(self):
100
        return self.mimetype.split(";", 1)[0]
101
102
    def __init__(self, path=None, app=None, **defaults):
103
        self.path = compat.fsdecode(path) if path else None
104
        self.app = current_app if app is None else app
105
        self.__dict__.update(defaults)
106
107
    def remove(self):
108
        if not self.can_remove:
109
            raise OutsideRemovableBase("File outside removable base")
110
111
    @classmethod
112
    def from_urlpath(cls, path, app=None):
113
        '''
114
        Alternative constructor which accepts a path as taken from URL and uses
115
        the given app or the current app config to get the real path.
116
117
        If class has attribute `generic` set to True, `directory_class` or
118
        `file_class` will be used as type.
119
120
        :param path: relative path as from URL
121
        :param app: optional, flask application
122
        :return: file object pointing to path
123
        :rtype: File
124
        '''
125
        app = app or current_app
126
        base = app.config['directory_base']
127
        path = urlpath_to_abspath(path, base)
128
        if not cls.generic:
129
            kls = cls
130
        elif os.path.isdir(path):
131
            kls = cls.directory_class
132
        else:
133
            kls = cls.file_class
134
        return kls(path=path, app=app)
135
136
    @classmethod
137
    def register_file_class(cls, kls):
138
        cls.file_class = kls
139
        return kls
140
141
    @classmethod
142
    def register_directory_class(cls, kls):
143
        cls.directory_class = kls
144
        return kls
145
146
147
@Node.register_file_class
148
class File(Node):
149
    can_download = True
150
    can_upload = False
151
    is_directory = False
152
    generic = False
153
154
    @property
155
    def default_action(self):
156
        action = super(File, self).default_action
157
        if action is None:
158
            widget = self.plugin_manager.link_class.from_file(self)
159
            action = self.plugin_manager.action_class('open', widget)
160
        return action
161
162
    @cached_property
163
    def mimetype(self):
164
        return self.plugin_manager.get_mimetype(self.path)
165
166
    @cached_property
167
    def is_file(self):
168
        return os.path.isfile(self.path)
169
170
    @property
171
    def size(self):
172
        size, unit = fmt_size(
173
            self.stats.st_size,
174
            self.app.config["use_binary_multiples"]
175
            )
176
        if unit == binary_units[0]:
177
            return "%d %s" % (size, unit)
178
        return "%.2f %s" % (size, unit)
179
180
    @property
181
    def encoding(self):
182
        if ";" in self.mimetype:
183
            match = self.re_charset.search(self.mimetype)
184
            gdict = match.groupdict() if match else {}
185
            return gdict.get("charset") or "default"
186
        return "default"
187
188
    def remove(self):
189
        '''
190
        Remove file.
191
        :raises OutsideRemovableBase: when not under removable base directory
192
        '''
193
        super(File, self).remove()
194
        os.unlink(self.path)
195
196
    def download(self):
197
        '''
198
        Get a Flask's send_file Response object pointing to this file.
199
200
        :returns: Response object as returned by flask's send_file
201
        :rtype: flask.Response
202
        '''
203
        directory, name = os.path.split(self.path)
204
        return send_from_directory(directory, name, as_attachment=True)
205
206
207
@Node.register_directory_class
208
class Directory(Node):
209
    _listdir_cache = None
210
    mimetype = 'inode/directory'
211
    is_directory = True
212
    is_file = False
213
    size = 0
214
    encoding = 'default'
215
    generic = False
216
217
    @property
218
    def default_action(self):
219
        action = super(Directory, self).default_action
220
        if action is None:
221
            widget = self.plugin_manager.link_class.from_file(self)
222
            action = self.plugin_manager.action_class('browse', widget)
223
        return action
224
225
    @cached_property
226
    def can_download(self):
227
        return self.app.config['directory_downloadable']
228
229
    @cached_property
230
    def can_upload(self):
231
        dirbase = self.app.config["directory_upload"]
232
        return dirbase and (
233
            dirbase == self.path or
234
            self.path.startswith(dirbase + os.sep)
235
            )
236
237
    @cached_property
238
    def is_empty(self):
239
        if self._listdir_cache is not None:
240
            return bool(self._listdir_cache)
241
        for entry in self._listdir():
242
            return False
243
        return True
244
245
    def remove(self):
246
        '''
247
        Remove directory tree.
248
249
        :raises OutsideRemovableBase: when not under removable base directory
250
        '''
251
        super(Directory, self).remove()
252
        shutil.rmtree(self.path)
253
254
    def download(self):
255
        '''
256
        Get a Flask Response object streaming a tarball of this directory.
257
258
        :returns: Response object
259
        :rtype: flask.Response
260
        '''
261
        return self.app.response_class(
262
            TarFileStream(
263
                self.path,
264
                self.app.config["directory_tar_buffsize"]
265
                ),
266
            mimetype="application/octet-stream"
267
            )
268
269
    def contains(self, filename):
270
        '''
271
        Check if directory contains an entry with given filename.
272
273
        :param filename: filename will be check
274
        :type filename: str
275
        :returns: True if exists, False otherwise.
276
        :rtype: bool
277
        '''
278
        return os.path.exists(os.path.join(self.path, filename))
279
280
    def choose_filename(self, filename, attempts=999):
281
        '''
282
        Get a new filename which does not colide with any entry on directory,
283
        based on given filename.
284
285
        :param filename: base filename
286
        :type filename: str
287
        :param attempts: number of attempts, defaults to 999
288
        :type attempts: int
289
        :returns: filename
290
        :rtype: str
291
        '''
292
        new_filename = filename
293
        for attempt in range(2, attempts + 1):
294
            if not self.contains(new_filename):
295
                return new_filename
296
            new_filename = alternative_filename(filename, attempt)
297
        while self.contains(new_filename):
298
            new_filename = alternative_filename(filename)
299
        return new_filename
300
301
    def _listdir(self):
302
        '''
303
        Iter unsorted entries on this directory.
304
305
        :yields: Directory or File instance for each entry in directory
306
        :ytype: Node
307
        '''
308
        precomputed_stats = os.name == 'nt'
309
        for entry in compat.scandir(self.path):
310
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
311
            if precomputed_stats and not entry.is_symlink():
312
                kwargs['stats'] = entry.stats()
313
            if entry.is_dir(follow_symlinks=True):
314
                yield self.directory_class(**kwargs)
315
                continue
316
            yield self.file_class(**kwargs)
317
318
    def listdir(self):
319
        '''
320
        Get sorted list (by `is_directory` and `name` properties) of File
321
        objects.
322
323
        :return: sorted list of File instances
324
        :rtype: list of File
325
        '''
326
        if self._listdir_cache is None:
327
            self._listdir_cache = sorted(
328
                self._listdir(),
329
                key=lambda f: (not f.is_directory, f.name.lower())
330
                )
331
        return self._listdir_cache
332
333
334
class TarFileStream(object):
335
    '''
336
    Tarfile which compresses while reading for streaming.
337
338
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
339
    compression.
340
    '''
341
    event_class = threading.Event
342
    thread_class = threading.Thread
343
    tarfile_class = tarfile.open
344
345
    def __init__(self, path, buffsize=10240):
346
        self.path = path
347
        self.name = os.path.basename(path) + ".tgz"
348
349
        self._finished = 0
350
        self._want = 0
351
        self._data = bytes()
352
        self._add = self.event_class()
353
        self._result = self.event_class()
354
        self._tarfile = self.tarfile_class(  # stream write
355
            fileobj=self,
356
            mode="w|gz",
357
            bufsize=buffsize
358
            )
359
        self._th = self.thread_class(target=self.fill)
360
        self._th.start()
361
362
    def fill(self):
363
        self._tarfile.add(self.path, "")
364
        self._tarfile.close()  # force stream flush
365
        self._finished += 1
366
        if not self._result.is_set():
367
            self._result.set()
368
369
    def write(self, data):
370
        self._add.wait()
371
        self._data += data
372
        if len(self._data) > self._want:
373
            self._add.clear()
374
            self._result.set()
375
        return len(data)
376
377
    def read(self, want=0):
378
        if self._finished:
379
            if self._finished == 1:
380
                self._finished += 1
381
                return ""
382
            return EOFError("EOF reached")
383
384
        # Thread communication
385
        self._want = want
386
        self._add.set()
387
        self._result.wait()
388
        self._result.clear()
389
390
        if want:
391
            data = self._data[:want]
392
            self._data = self._data[want:]
393
        else:
394
            data = self._data
395
            self._data = bytes()
396
        return data
397
398
    def __iter__(self):
399
        data = self.read()
400
        while data:
401
            yield data
402
            data = self.read()
403
404
405
class OutsideDirectoryBase(Exception):
406
    pass
407
408
409
class OutsideRemovableBase(Exception):
410
    pass
411
412
413
def fmt_size(size, binary=True):
414
    '''
415
    Get size and unit.
416
417
    :param size: size in bytes
418
    :param binary: whether use binary or standard units, defaults to True
419
    :return: size and unit
420
    :rtype: tuple of int and unit as str
421
    '''
422
    if binary:
423
        fmt_sizes = binary_units
424
        fmt_divider = 1024.
425
    else:
426
        fmt_sizes = standard_units
427
        fmt_divider = 1000.
428
    for fmt in fmt_sizes[:-1]:
429
        if size < 1000:
430
            return (size, fmt)
431
        size /= fmt_divider
432
    return size, fmt_sizes[-1]
433
434
435
def relativize_path(path, base, os_sep=os.sep):
436
    '''
437
    Make absolute path relative to an absolute base.
438
439
    :param path: absolute path
440
    :param base: absolute base path
441
    :param os_sep: path component separator, defaults to current OS separator
442
    :return: relative path
443
    :rtype: str or unicode
444
    :raises OutsideDirectoryBase: if path is not below base
445
    '''
446
    if not check_under_base(path, base, os_sep):
447
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
448
    prefix_len = len(base)
449
    if not base.endswith(os_sep):
450
        prefix_len += len(os_sep)
451
    return path[prefix_len:]
452
453
454
def abspath_to_urlpath(path, base, os_sep=os.sep):
455
    '''
456
    Make filesystem absolute path uri relative using given absolute base path.
457
458
    :param path: absolute path
459
    :param base: absolute base path
460
    :param os_sep: path component separator, defaults to current OS separator
461
    :return: relative uri
462
    :rtype: str or unicode
463
    :raises OutsideDirectoryBase: if resulting path is not below base
464
    '''
465
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
466
467
468
def urlpath_to_abspath(path, base, os_sep=os.sep):
469
    '''
470
    Make uri relative path fs absolute using a given absolute base path.
471
472
    :param path: relative path
473
    :param base: absolute base path
474
    :param os_sep: path component separator, defaults to current OS separator
475
    :return: absolute path
476
    :rtype: str or unicode
477
    :raises OutsideDirectoryBase: if resulting path is not below base
478
    '''
479
    prefix = base if base.endswith(os_sep) else base + os_sep
480
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
481
    if base == realpath or realpath.startswith(prefix):
482
        return realpath
483
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
484
485
486
def generic_filename(path):
487
    '''
488
    Extract filename of given path os-indepently, taking care of known path
489
    separators.
490
491
    :param path: path
492
    :return: filename
493
    :rtype: str or unicode (depending on given path)
494
    '''
495
496
    for sep in common_path_separators:
497
        if sep in path:
498
            _, path = path.rsplit(sep, 1)
499
    return path
500
501
502
def clean_restricted_chars(path, restricted_chars=restricted_chars):
503
    '''
504
    Get path without restricted characters.
505
506
    :param path: path
507
    :return: path without restricted characters
508
    :rtype: str or unicode (depending on given path)
509
    '''
510
    for character in restricted_chars:
511
        path = path.replace(character, '_')
512
    return path
513
514
515
def check_forbidden_filename(filename,
516
                             destiny_os=os.name,
517
                             restricted_names=restricted_names):
518
    '''
519
    Get if given filename is forbidden for current OS or filesystem.
520
521
    :param filename:
522
    :param destiny_os: destination operative system
523
    :param fs_encoding: destination filesystem filename encoding
524
    :return: wether is forbidden on given OS (or filesystem) or not
525
    :rtype: bool
526
    '''
527
    if destiny_os == 'nt':
528
        fpc = filename.split('.', 1)[0].upper()
529
        if fpc in nt_device_names:
530
            return True
531
532
    return filename in restricted_names
533
534
535
def check_under_base(path, base, os_sep=os.sep):
536
    '''
537
    Check if given absolute path is under given base.
538
539
    :param path: absolute path
540
    :param base: absolute base path
541
    :return: wether file is under given base or not
542
    :rtype: bool
543
    '''
544
    prefix = base if base.endswith(os_sep) else base + os_sep
545
    return path == base or path.startswith(prefix)
546
547
548
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.fs_encoding):
549
    '''
550
    Get rid of parent path components and special filenames.
551
552
    If path is invalid or protected, return empty string.
553
554
    :param path: unsafe path
555
    :type: str
556
    :param destiny_os: destination operative system
557
    :type destiny_os: str
558
    :return: filename or empty string
559
    :rtype: str
560
    '''
561
    path = generic_filename(path)
562
    path = clean_restricted_chars(path)
563
564
    if check_forbidden_filename(path, destiny_os=destiny_os):
565
        return ''
566
567
    if isinstance(path, bytes):
568
        path = path.decode('latin-1', errors=underscore_replace)
569
570
    # Decode and recover from filesystem encoding in order to strip unwanted
571
    # characters out
572
    kwargs = dict(
573
        os_name=destiny_os,
574
        fs_encoding=fs_encoding,
575
        errors=underscore_replace
576
        )
577
    fs_encoded_path = compat.fsencode(path, **kwargs)
578
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
579
    return fs_decoded_path
580
581
582
def alternative_filename(filename, attempt=None):
583
    '''
584
    Generates an alternative version of given filename.
585
586
    If an number attempt parameter is given, will be used on the alternative
587
    name, a random value will be used otherwise.
588
589
    :param filename: original filename
590
    :param attempt: optional attempt number, defaults to null
591
    :return: new filename
592
    :rtype: str or unicode
593
    '''
594
    filename_parts = filename.rsplit(u'.', 2)
595
    name = filename_parts[0]
596
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
597
    if attempt is None:
598
        choose = random.choice
599
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
600
    else:
601
        extra = u' (%d)' % attempt
602
    return u'%s%s%s' % (name, extra, ext)
603