Completed
Push — master ( c18a49...b09083 )
by Felipe A.
53s
created

browsepy.file.File.mtime()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import os
6
import os.path
7
import re
8
import shutil
9
import codecs
10
import threading
11
import string
12
import tarfile
13
import random
14
import datetime
15
16
from flask import current_app, send_from_directory, Response
17
from werkzeug.utils import cached_property
18
19
from ..compat import PY_LEGACY, range, FileNotFoundError
20
from .mimetype import detect_mimetype
21
22
undescore_replace = '%s:underscore' % __name__
23
codecs.register_error(undescore_replace,
24
                      (lambda error: (u'_', error.start + 1))
25
                      if PY_LEGACY else
26
                      (lambda error: ('_', error.start + 1))
27
                      )
28
29
class File(object):
30
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
31
    def __init__(self, path, app=None):
32
        self.path = path
33
        self.app = current_app if app is None else app
34
35
    def remove(self):
36
        if not self.can_remove:
37
            raise OutsideRemovableBase("File outside removable base")
38
        if self.is_directory:
39
            shutil.rmtree(self.path)
40
        else:
41
            os.unlink(self.path)
42
43
    def download(self):
44
        if self.is_directory:
45
            stream = TarFileStream(
46
                self.path,
47
                self.app.config["directory_tar_buffsize"]
48
                )
49
            return Response(stream, mimetype="application/octet-stream")
50
        directory, name = os.path.split(self.path)
51
        return send_from_directory(directory, name, as_attachment=True)
52
53
    def contains(self, filename):
54
        return os.path.exists(os.path.join(self.path, filename))
55
56
    def choose_filename(self, filename, attempts=999):
57
        new_filename = filename
58
        for attempt in range(2, attempts+1):
59
            if not self.contains(new_filename):
60
                return new_filename
61
            new_filename = alternative_filename(filename, attempt)
62
        while self.contains(new_filename):
63
            new_filename = alternative_filename(filename)
64
        return new_filename
65
66
    @property
67
    def actions(self):
68
        plugin_manager = self.app.extensions['plugin_manager']
69
        return plugin_manager.get_actions(self.mimetype)
70
71
    @cached_property
72
    def can_download(self):
73
        return self.app.config['directory_downloadable'] or not self.is_directory
74
75
    @cached_property
76
    def can_remove(self):
77
        dirbase = self.app.config["directory_remove"]
78
        if dirbase:
79
            return self.path.startswith(dirbase + os.sep)
80
        return False
81
82
    @cached_property
83
    def can_upload(self):
84
        dirbase = self.app.config["directory_upload"]
85
        if self.is_directory and dirbase:
86
            return dirbase == self.path or self.path.startswith(dirbase + os.sep)
87
        return False
88
89
    @cached_property
90
    def stats(self):
91
        return os.stat(self.path)
92
93
    @cached_property
94
    def mimetype(self):
95
        if self.is_directory:
96
            return 'inode/directory'
97
        return detect_mimetype(self.path)
98
99
    @cached_property
100
    def is_directory(self):
101
        return os.path.isdir(self.path)
102
103
    @cached_property
104
    def parent(self):
105
        return File(os.path.dirname(self.path))
106
107
    @property
108
    def mtime(self):
109
        return self.stats.st_mtime
110
111
    @property
112
    def modified(self):
113
        return datetime.datetime.fromtimestamp(self.mtime).strftime('%Y.%m.%d %H:%M:%S')
114
115
    @property
116
    def size(self):
117
        size, unit = fmt_size(self.stats.st_size, self.app.config["use_binary_multiples"])
118
        if unit == binary_units[0]:
119
            return "%d %s" % (size, unit)
120
        return "%.2f %s" % (size, unit)
121
122
    @property
123
    def relpath(self):
124
        return relativize_path(self.path, self.app.config['directory_base'])
125
126
    @property
127
    def basename(self):
128
        return os.path.basename(self.path)
129
130
    @property
131
    def dirname(self):
132
        return os.path.dirname(self.path)
133
134
    @property
135
    def type(self):
136
        return self.mimetype.split(";", 1)[0]
137
138
    @property
139
    def encoding(self):
140
        if ";" in self.mimetype:
141
            match = self.re_charset.search(self.mimetype)
142
            gdict = match.groupdict() if match else {}
143
            return gdict.get("charset") or "default"
144
        return "default"
145
146
    @classmethod
147
    def listdir_order(cls, path):
148
        return not os.path.isdir(path), os.path.basename(path).lower()
149
150
    def listdir(self):
151
        pjoin = os.path.join # minimize list comprehension overhead
152
        content = [pjoin(self.path, i) for i in os.listdir(self.path)]
153
        content.sort(key=self.listdir_order)
154
        for i in content:
155
            yield self.__class__(i)
156
157
158
class TarFileStream(object):
159
    '''
160
    Tarfile which compresses while reading for streaming.
161
162
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
163
    compression.
164
    '''
165
    event_class = threading.Event
166
    thread_class = threading.Thread
167
    tarfile_class = tarfile.open
168
169
    def __init__(self, path, buffsize=10240):
170
        self.path = path
171
        self.name = os.path.basename(path) + ".tgz"
172
173
        self._finished = 0
174
        self._want = 0
175
        self._data = bytes()
176
        self._add = self.event_class()
177
        self._result = self.event_class()
178
        self._tarfile = self.tarfile_class(fileobj=self, mode="w|gz", bufsize=buffsize) # stream write
179
        self._th = self.thread_class(target=self.fill)
180
        self._th.start()
181
182
    def fill(self):
183
        self._tarfile.add(self.path, "")
184
        self._tarfile.close() # force stream flush
185
        self._finished += 1
186
        if not self._result.is_set():
187
            self._result.set()
188
189
    def write(self, data):
190
        self._add.wait()
191
        self._data += data
192
        if len(self._data) > self._want:
193
            self._add.clear()
194
            self._result.set()
195
        return len(data)
196
197
    def read(self, want=0):
198
        if self._finished:
199
            if self._finished == 1:
200
                self._finished += 1
201
                return ""
202
            return EOFError("EOF reached")
203
204
        # Thread communication
205
        self._want = want
206
        self._add.set()
207
        self._result.wait()
208
        self._result.clear()
209
210
        if want:
211
            data = self._data[:want]
212
            self._data = self._data[want:]
213
        else:
214
            data = self._data
215
            self._data = bytes()
216
        return data
217
218
    def __iter__(self):
219
        data = self.read()
220
        while data:
221
            yield data
222
            data = self.read()
223
224
225
class OutsideDirectoryBase(Exception):
226
    pass
227
228
229
class OutsideRemovableBase(Exception):
230
    pass
231
232
233
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
234
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
235
def fmt_size(size, binary=True):
236
    '''
237
    Get size and unit.
238
239
    :param size: size in bytes
240
    :param binary: whether use binary or standard units, defaults to True
241
    :return: size and unit
242
    :rtype: tuple of int and unit as str
243
    '''
244
    if binary:
245
        fmt_sizes = binary_units
246
        fmt_divider = 1024.
247
    else:
248
        fmt_sizes = standard_units
249
        fmt_divider = 1000.
250
    for fmt in fmt_sizes[:-1]:
251
        if size < 1000:
252
            return (size, fmt)
253
        size /= fmt_divider
254
    return size, fmt_sizes[-1]
255
256
def root_path(path, os_sep=os.sep):
257
    '''
258
    Get root of given path.
259
260
    :param path: absolute path
261
    :param os_sep: path component separator, defaults to current OS separator
262
    :return: path
263
    :rtype: str or unicode
264
    '''
265
    if os_sep == '\\' and path.startswith('//'):
266
        return '//%s' % path[2:].split('/')[0]
267
    return path.split(os_sep)[0] or '/'
268
269
def relativize_path(path, base, os_sep=os.sep):
270
    '''
271
    Make absolute path relative to an absolute base.
272
273
    :param path: absolute path
274
    :param base: absolute base path
275
    :param os_sep: path component separator, defaults to current OS separator
276
    :return: relative path
277
    :rtype: str or unicode
278
    :raises OutsideDirectoryBase: if path is not below base
279
    '''
280
    prefix = os.path.commonprefix((path, base))
281
    if not prefix or prefix == root_path(base, os_sep):
282
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
283
    prefix_len = len(prefix)
284
    if not prefix.endswith(os_sep):
285
        prefix_len += len(os_sep)
286
    relpath = path[prefix_len:]
287
    return relpath
288
289
common_path_separators = '\\/'
290
def generic_filename(path):
291
    '''
292
    Extract filename of given path os-indepently, taking care of known path separators.
293
294
    :param path: path
295
    :return: filename
296
    :rtype: str or unicode (depending on given path)
297
    '''
298
299
    for sep in common_path_separators:
300
        if sep in path:
301
            _, path = path.rsplit(sep, 1)
302
    return path
303
304
restricted_chars = '\\/\0'
305
def clean_restricted_chars(path, restricted_chars=restricted_chars):
306
    '''
307
    Get path without restricted characters.
308
309
    :param path: path
310
    :return: path without restricted characters
311
    :rtype: str or unicode (depending on given path)
312
    '''
313
    for character in restricted_chars:
314
        path = path.replace(character, '_')
315
    return path
316
317
restricted_names = ('.', '..', '::', os.sep)
318
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1', 'LPT2', 'LPT3', 'PRN', 'NUL')
319
fs_encoding = 'unicode' if os.name == 'nt' else sys.getfilesystemencoding() or 'ascii'
320
def check_forbidden_filename(filename, destiny_os=os.name, fs_encoding=fs_encoding,
321
                             restricted_names=restricted_names):
322
    '''
323
    Get if given filename is forbidden for current OS or filesystem.
324
325
    :param filename:
326
    :param destiny_os: destination operative system
327
    :param fs_encoding: destination filesystem filename encoding
328
    :return: whether is forbidden on given OS (or filesystem) or not
329
    :rtype: bool
330
    '''
331
    if destiny_os == 'nt':
332
        fpc = filename.split('.', 1)[0].upper()
333
        if fpc in nt_device_names:
334
            return True
335
336
    return filename in restricted_names
337
338
def secure_filename(path, destiny_os=os.name, fs_encoding=fs_encoding):
339
    '''
340
    Get rid of parent path components and special filenames.
341
342
    If path is invalid or protected, return empty string.
343
344
    :param path: unsafe path
345
    :param destiny_os: destination operative system
346
    :param fs_encoding: destination filesystem filename encoding
347
    :return: filename or empty string
348
    :rtype: str or unicode (depending on python version, destiny_os and fs_encoding)
349
    '''
350
    path = generic_filename(path)
351
    path = clean_restricted_chars(path)
352
353
    if check_forbidden_filename(path, destiny_os=destiny_os, fs_encoding=fs_encoding):
354
        return ''
355
356
    if fs_encoding != 'unicode':
357
        if PY_LEGACY and not isinstance(path, unicode):
358
            path = unicode(path, encoding='latin-1')
359
        path = path.encode(fs_encoding, errors=undescore_replace).decode(fs_encoding)
360
361
    return path
362
363
fs_safe_characters = string.ascii_uppercase + string.digits
364
def alternative_filename(filename, attempt=None):
365
    '''
366
    Generates an alternative version of given filename.
367
368
    If an number attempt parameter is given, will be used on the alternative
369
    name, a random value will be used otherwise.
370
371
    :param filename: original filename
372
    :param attempt: optional attempt number, defaults to null
373
    :return: new filename
374
    :rtype: str or unicode
375
    '''
376
    filename_parts = filename.rsplit('.', 2)
377
    name = filename_parts[0]
378
    ext = ''.join('.%s' % ext for ext in filename_parts[1:])
379
    if attempt is None:
380
        extra = ' %s' % ''.join(random.choice(fs_safe_characters) for i in range(8))
381
    else:
382
        extra = ' (%d)' % attempt
383
    return '%s%s%s' % (name, extra, ext)
384