Completed
Push — master ( b13734...b2c969 )
by Felipe A.
47s
created

browsepy.File.default_action()   A

Complexity

Conditions 4

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 8
rs 9.2
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
import functools
16
17
from flask import current_app, send_from_directory, Response
18
from werkzeug.utils import cached_property
19
20
from .compat import PY_LEGACY, range, FileNotFoundError
21
22
undescore_replace = '%s:underscore' % __name__
23
codecs.register_error(undescore_replace,
24
                      (lambda error: (u'_', error.start + 1))
25
                      if PY_LEGACY else
26
                      (lambda error: ('_', error.start + 1))
27
                      )
28
if not PY_LEGACY:
29
    unicode = str
30
31
32
class File(object):
33
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
34
    parent_class = None # none means current class
35
36
    def __init__(self, path=None, app=None):
37
        self.path = path
38
        self.app = current_app if app is None else app
39
40
    def remove(self):
41
        if not self.can_remove:
42
            raise OutsideRemovableBase("File outside removable base")
43
        if self.is_directory:
44
            shutil.rmtree(self.path)
45
        else:
46
            os.unlink(self.path)
47
48
    def download(self):
49
        if self.is_directory:
50
            stream = TarFileStream(
51
                self.path,
52
                self.app.config["directory_tar_buffsize"]
53
                )
54
            return Response(stream, mimetype="application/octet-stream")
55
        directory, name = os.path.split(self.path)
56
        return send_from_directory(directory, name, as_attachment=True)
57
58
    def contains(self, filename):
59
        return os.path.exists(os.path.join(self.path, filename))
60
61
    def choose_filename(self, filename, attempts=999):
62
        new_filename = filename
63
        for attempt in range(2, attempts+1):
64
            if not self.contains(new_filename):
65
                return new_filename
66
            new_filename = alternative_filename(filename, attempt)
67
        while self.contains(new_filename):
68
            new_filename = alternative_filename(filename)
69
        return new_filename
70
71
    @property
72
    def plugin_manager(self):
73
        return self.app.extensions['plugin_manager']
74
75
    @property
76
    def default_action(self):
77
        for action in self.actions:
78
            if action.widget.place == 'link':
79
                return action
80
        endpoint = 'browse' if self.is_directory else 'open'
81
        widget = self.plugin_manager.link_class.from_file(self)
82
        return self.plugin_manager.action_class(endpoint, widget)
83
84
    @cached_property
85
    def actions(self):
86
        return self.plugin_manager.get_actions(self)
87
88
    @cached_property
89
    def can_download(self):
90
        return self.app.config['directory_downloadable'] or not self.is_directory
91
92
    @cached_property
93
    def can_remove(self):
94
        dirbase = self.app.config["directory_remove"]
95
        if dirbase:
96
            return self.path.startswith(dirbase + os.sep)
97
        return False
98
99
    @cached_property
100
    def can_upload(self):
101
        dirbase = self.app.config["directory_upload"]
102
        if self.is_directory and dirbase:
103
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
104
        return False
105
106
    @cached_property
107
    def stats(self):
108
        return os.stat(self.path)
109
110
    @cached_property
111
    def mimetype(self):
112
        if self.is_directory:
113
            return 'inode/directory'
114
        return self.plugin_manager.get_mimetype(self.path)
115
116
    @cached_property
117
    def is_directory(self):
118
        return os.path.isdir(self.path)
119
120
    @cached_property
121
    def is_file(self):
122
        return os.path.isfile(self.path)
123
124
    @cached_property
125
    def is_empty(self):
126
        return not self._listdir
127
128
    @cached_property
129
    def parent(self):
130
        if self.path == self.app.config['directory_base']:
131
            return None
132
        parent_class = self.parent_class or self.__class__
133
        return parent_class(os.path.dirname(self.path), self.app)
134
135
    @cached_property
136
    def ancestors(self):
137
        ancestors = []
138
        parent = self.parent
139
        while parent:
140
            ancestors.append(parent)
141
            parent = parent.parent
142
        return tuple(ancestors)
143
144
    @property
145
    def modified(self):
146
        return datetime.datetime.fromtimestamp(self.stats.st_mtime).strftime('%Y.%m.%d %H:%M:%S')
147
148
    @property
149
    def size(self):
150
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
151
        if unit == binary_units[0]:
152
            return "%d %s" % (size, unit)
153
        return "%.2f %s" % (size, unit)
154
155
    @property
156
    def urlpath(self):
157
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
158
159
    @property
160
    def name(self):
161
        return os.path.basename(self.path)
162
163
    @property
164
    def type(self):
165
        return self.mimetype.split(";", 1)[0]
166
167
    @property
168
    def encoding(self):
169
        if ";" in self.mimetype:
170
            match = self.re_charset.search(self.mimetype)
171
            gdict = match.groupdict() if match else {}
172
            return gdict.get("charset") or "default"
173
        return "default"
174
175
    def listdir(self):
176
        path_joiner = functools.partial(os.path.join, self.path)
177
        content = [
178
            self.__class__(path=path_joiner(path), app=self.app)
179
            for path in os.listdir(self.path)
180
            ]
181
        content.sort(key=lambda f: (f.is_directory, f.name.lower()))
182
        return content
183
184
    @classmethod
185
    def from_urlpath(cls, path, app=None):
186
        app = app or current_app
187
        base = app.config['directory_base']
188
        return cls(path=urlpath_to_abspath(path, base), app=app)
189
190
191
class TarFileStream(object):
192
    '''
193
    Tarfile which compresses while reading for streaming.
194
195
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
196
    compression.
197
    '''
198
    event_class = threading.Event
199
    thread_class = threading.Thread
200
    tarfile_class = tarfile.open
201
202
    def __init__(self, path, buffsize=10240):
203
        self.path = path
204
        self.name = os.path.basename(path) + ".tgz"
205
206
        self._finished = 0
207
        self._want = 0
208
        self._data = bytes()
209
        self._add = self.event_class()
210
        self._result = self.event_class()
211
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
212
        self._th = self.thread_class(target=self.fill)
213
        self._th.start()
214
215
    def fill(self):
216
        self._tarfile.add(self.path, "")
217
        self._tarfile.close() # force stream flush
218
        self._finished += 1
219
        if not self._result.is_set():
220
            self._result.set()
221
222
    def write(self, data):
223
        self._add.wait()
224
        self._data += data
225
        if len(self._data) > self._want:
226
            self._add.clear()
227
            self._result.set()
228
        return len(data)
229
230
    def read(self, want=0):
231
        if self._finished:
232
            if self._finished == 1:
233
                self._finished += 1
234
                return ""
235
            return EOFError("EOF reached")
236
237
        # Thread communication
238
        self._want = want
239
        self._add.set()
240
        self._result.wait()
241
        self._result.clear()
242
243
        if want:
244
            data = self._data[:want]
245
            self._data = self._data[want:]
246
        else:
247
            data = self._data
248
            self._data = bytes()
249
        return data
250
251
    def __iter__(self):
252
        data = self.read()
253
        while data:
254
            yield data
255
            data = self.read()
256
257
258
class OutsideDirectoryBase(Exception):
259
    pass
260
261
262
class OutsideRemovableBase(Exception):
263
    pass
264
265
266
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
267
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
268
def fmt_size(size, binary=True):
269
    '''
270
    Get size and unit.
271
272
    :param size: size in bytes
273
    :param binary: whether use binary or standard units, defaults to True
274
    :return: size and unit
275
    :rtype: tuple of int and unit as str
276
    '''
277
    if binary:
278
        fmt_sizes = binary_units
279
        fmt_divider = 1024.
280
    else:
281
        fmt_sizes = standard_units
282
        fmt_divider = 1000.
283
    for fmt in fmt_sizes[:-1]:
284
        if size < 1000:
285
            return (size, fmt)
286
        size /= fmt_divider
287
    return size, fmt_sizes[-1]
288
289
def relativize_path(path, base, os_sep=os.sep):
290
    '''
291
    Make absolute path relative to an absolute base.
292
293
    :param path: absolute path
294
    :param base: absolute base path
295
    :param os_sep: path component separator, defaults to current OS separator
296
    :return: relative path
297
    :rtype: str or unicode
298
    :raises OutsideDirectoryBase: if path is not below base
299
    '''
300
    if not check_under_base(path, base, os_sep):
301
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
302
    prefix_len = len(base)
303
    if not base.endswith(os_sep):
304
        prefix_len += len(os_sep)
305
    return path[prefix_len:]
306
307
def abspath_to_urlpath(path, base, os_sep=os.sep):
308
    '''
309
    Make filesystem absolute path uri relative using given absolute base path.
310
311
    :param path: absolute path
312
    :param base: absolute base path
313
    :param os_sep: path component separator, defaults to current OS separator
314
    :return: relative uri
315
    :rtype: str or unicode
316
    :raises OutsideDirectoryBase: if resulting path is not below base
317
    '''
318
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
319
320
def urlpath_to_abspath(path, base, os_sep=os.sep):
321
    '''
322
    Make uri relative path fs absolute using a given absolute base path.
323
324
    :param path: relative path
325
    :param base: absolute base path
326
    :param os_sep: path component separator, defaults to current OS separator
327
    :return: absolute path
328
    :rtype: str or unicode
329
    :raises OutsideDirectoryBase: if resulting path is not below base
330
    '''
331
    prefix = base if base.endswith(os_sep) else base + os_sep
332
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
333
    if base == realpath or realpath.startswith(prefix):
334
        return realpath
335
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
336
337
common_path_separators = '\\/'
338
def generic_filename(path):
339
    '''
340
    Extract filename of given path os-indepently, taking care of known path separators.
341
342
    :param path: path
343
    :return: filename
344
    :rtype: str or unicode (depending on given path)
345
    '''
346
347
    for sep in common_path_separators:
348
        if sep in path:
349
            _, path = path.rsplit(sep, 1)
350
    return path
351
352
restricted_chars = '\\/\0'
353
def clean_restricted_chars(path, restricted_chars=restricted_chars):
354
    '''
355
    Get path without restricted characters.
356
357
    :param path: path
358
    :return: path without restricted characters
359
    :rtype: str or unicode (depending on given path)
360
    '''
361
    for character in restricted_chars:
362
        path = path.replace(character, '_')
363
    return path
364
365
restricted_names = ('.', '..', '::', os.sep)
366
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
367
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
368
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
369
                             restricted_names=restricted_names):
370
    '''
371
    Get if given filename is forbidden for current OS or filesystem.
372
373
    :param filename:
374
    :param destiny_os: destination operative system
375
    :param fs_encoding: destination filesystem filename encoding
376
    :return: wether is forbidden on given OS (or filesystem) or not
377
    :rtype: bool
378
    '''
379
    if destiny_os == 'nt':
380
        fpc = filename.split('.', 1)[0].upper()
381
        if fpc in nt_device_names:
382
            return True
383
384
    return filename in restricted_names
385
386
def check_under_base(path, base, os_sep=os.sep):
387
    '''
388
    Check if given absolute path is under given base.
389
390
    :param path: absolute path
391
    :param base: absolute base path
392
    :return: wether file is under given base or not
393
    :rtype: bool
394
    '''
395
    prefix = base if base.endswith(os_sep) else base + os_sep
396
    return path == base or path.startswith(prefix)
397
398
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
399
    '''
400
    Get rid of parent path components and special filenames.
401
402
    If path is invalid or protected, return empty string.
403
404
    :param path: unsafe path
405
    :param destiny_os: destination operative system
406
    :param fs_encoding: destination filesystem filename encoding
407
    :return: filename or empty string
408
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
409
    '''
410
    path = generic_filename(path)
411
    path = clean_restricted_chars(path)
412
413
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
414
        return ''
415
416
    if fs_encoding != 'unicode':
417
        if PY_LEGACY and not isinstance(path, unicode):
418
            path = unicode(path, encoding='latin-1')
419
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
420
421
    return path
422
423
fs_safe_characters = string.ascii_uppercase + string.digits
424
def alternative_filename(filename, attempt=None):
425
    '''
426
    Generates an alternative version of given filename.
427
428
    If an number attempt parameter is given, will be used on the alternative
429
    name, a random value will be used otherwise.
430
431
    :param filename: original filename
432
    :param attempt: optional attempt number, defaults to null
433
    :return: new filename
434
    :rtype: str or unicode
435
    '''
436
    filename_parts = filename.rsplit('.', 2)
437
    name = filename_parts[0]
438
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
439
    if attempt is None:
440
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
441
    else:
442
        extra = ' (%d)' % attempt
443
    return '%s%s%s' % (name, extra, ext)
444