Completed
Push — 0.5.3 ( e84307...a551a8 )
by Felipe A.
01:14
created

Directory.name()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 9
rs 9.6666
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    '''
41
    Abstract filesystem node class.
42
43
    This represents an unspecified entity with a filesystem's path suitable for
44
    being inherited by plugins.
45
46
    When inheriting, the following attributes should be overwritten in order
47
    to specify :meth:`from_urlpath` classmethod behavior:
48
49
    * :attr:`generic`, if true, an instance of directory_class or file_class
50
      will be created instead of an instance of this class tself.
51
    * :attr:`directory_class`, class will be used for directory nodes,
52
    * :attr:`file_class`, class will be used for file nodes.
53
    '''
54
    generic = True
55
    directory_class = None  # set later at import time
56
    file_class = None  # set later at import time
57
58
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
59
    can_download = False
60
61
    @cached_property
62
    def plugin_manager(self):
63
        '''
64
        Get current app's plugin manager.
65
66
        :returns: plugin manager instance
67
        '''
68
        return self.app.extensions['plugin_manager']
69
70
    @cached_property
71
    def widgets(self):
72
        '''
73
        List widgets with filter return True for this node (or without filter).
74
75
        Remove button is prepended if :property:can_remove returns true.
76
77
        :returns: list of widgets
78
        :rtype: list of namedtuple instances
79
        '''
80
        widgets = []
81
        if self.can_remove:
82
            widgets.append(
83
                self.plugin_manager.create_widget(
84
                    'entry-actions',
85
                    'button',
86
                    file=self,
87
                    css='remove',
88
                    endpoint='remove'
89
                    )
90
                )
91
        return widgets + self.plugin_manager.get_widgets(file=self)
92
93
    @cached_property
94
    def link(self):
95
        '''
96
        Get last widget with place "entry-link".
97
98
        :returns: widget on entry-link (ideally a link one)
99
        :rtype: namedtuple instance
100
        '''
101
        link = None
102
        for widget in self.widgets:
103
            if widget.place == 'entry-link':
104
                link = widget
105
        return link
106
107
    @cached_property
108
    def can_remove(self):
109
        '''
110
        Get if current node can be removed based on app config's
111
        directory_remove.
112
113
        :returns: True if current node can be removed, False otherwise.
114
        :rtype: bool
115
        '''
116
        dirbase = self.app.config["directory_remove"]
117
        return dirbase and check_under_base(self.path, dirbase)
118
119
    @cached_property
120
    def stats(self):
121
        '''
122
        Get current stats object as returned by os.stat function.
123
124
        :returns: stats object
125
        :rtype: posix.stat_result or nt.stat_result
126
        '''
127
        return os.stat(self.path)
128
129
    @cached_property
130
    def parent(self):
131
        '''
132
        Get parent node if available based on app config's directory_base.
133
134
        :returns: parent object if available
135
        :rtype: Node instance or None
136
        '''
137
        if self.path == self.app.config['directory_base']:
138
            return None
139
        parent = os.path.dirname(self.path) if self.path else None
140
        return self.directory_class(parent, self.app) if parent else None
141
142
    @cached_property
143
    def ancestors(self):
144
        '''
145
        Get list of ancestors until app config's directory_base is reached.
146
147
        :returns: list of ancestors starting from nearest.
148
        :rtype: list of Node objects
149
        '''
150
        ancestors = []
151
        parent = self.parent
152
        while parent:
153
            ancestors.append(parent)
154
            parent = parent.parent
155
        return ancestors
156
157
    @property
158
    def modified(self):
159
        '''
160
        Get human-readable last modification date-time.
161
162
        :returns: iso9008-like date-time string (without timezone)
163
        :rtype: str
164
        '''
165
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
166
        return dt.strftime('%Y.%m.%d %H:%M:%S')
167
168
    @property
169
    def urlpath(self):
170
        '''
171
        Get the url substring corresponding to this node for those endpoints
172
        accepting a 'path' parameter, suitable for :meth:`from_urlpath`.
173
174
        :returns: relative-url-like for node's path
175
        :rtype: str
176
        '''
177
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
178
179
    @property
180
    def name(self):
181
        '''
182
        Get the basename portion of node's path.
183
184
        :returns: filename
185
        :rtype: str
186
        '''
187
        return os.path.basename(self.path)
188
189
    @property
190
    def type(self):
191
        '''
192
        Get the mime portion of node's mimetype (without the encoding part).
193
194
        :returns: mimetype
195
        :rtype: str
196
        '''
197
        return self.mimetype.split(";", 1)[0]
198
199
    @property
200
    def category(self):
201
        '''
202
        Get mimetype category (first portion of mimetype before the slash).
203
204
        :returns: mimetype category
205
        :rtype: str
206
207
        As of 2016-11-03's revision of RFC2046 it could be one of the
208
        following:
209
            * application
210
            * audio
211
            * example
212
            * image
213
            * message
214
            * model
215
            * multipart
216
            * text
217
            * video
218
        '''
219
        return self.type.split('/', 1)[0]
220
221
    def __init__(self, path=None, app=None, **defaults):
222
        '''
223
        :param path: local path
224
        :type path: str
225
        :param path: optional app instance
226
        :type path: flask.app
227
        :param **defaults: attributes will be set to object
228
        '''
229
        self.path = compat.fsdecode(path) if path else None
230
        self.app = current_app if app is None else app
231
        self.__dict__.update(defaults)  # only for attr and cached_property
232
233
    def remove(self):
234
        '''
235
        Does nothing except raising if can_remove property returns False.
236
237
        :raises: OutsideRemovableBase if :property:can_remove returns false
238
        '''
239
        if not self.can_remove:
240
            raise OutsideRemovableBase("File outside removable base")
241
242
    @classmethod
243
    def from_urlpath(cls, path, app=None):
244
        '''
245
        Alternative constructor which accepts a path as taken from URL and uses
246
        the given app or the current app config to get the real path.
247
248
        If class has attribute `generic` set to True, `directory_class` or
249
        `file_class` will be used as type.
250
251
        :param path: relative path as from URL
252
        :param app: optional, flask application
253
        :return: file object pointing to path
254
        :rtype: File
255
        '''
256
        app = app or current_app
257
        base = app.config['directory_base']
258
        path = urlpath_to_abspath(path, base)
259
        if not cls.generic:
260
            kls = cls
261
        elif os.path.isdir(path):
262
            kls = cls.directory_class
263
        else:
264
            kls = cls.file_class
265
        return kls(path=path, app=app)
266
267
    @classmethod
268
    def register_file_class(cls, kls):
269
        '''
270
        Convenience method for setting current class file_class property.
271
272
        :param kls: class to set
273
        :type kls: type
274
        :returns: given class (enabling using this as decorator)
275
        :rtype: type
276
        '''
277
        cls.file_class = kls
278
        return kls
279
280
    @classmethod
281
    def register_directory_class(cls, kls):
282
        '''
283
        Convenience method for setting current class directory_class property.
284
285
        :param kls: class to set
286
        :type kls: type
287
        :returns: given class (enabling using this as decorator)
288
        :rtype: type
289
        '''
290
        cls.directory_class = kls
291
        return kls
292
293
294
@Node.register_file_class
295
class File(Node):
296
    '''
297
    Filesystem file class.
298
299
    Some notes:
300
301
    * :attr:`can_download` is fixed to True, so Files can be downloaded
302
      inconditionaly.
303
    * :attr:`can_upload` is fixed to False, so nothing can be uploaded to
304
      file path.
305
    * :attr:`is_directory` is fixed to False, so no further checks are
306
      performed.
307
    * :attr:`generic` is set to False, so static method :meth:`from_urlpath`
308
      will always return instances of this class.
309
    '''
310
    can_download = True
311
    can_upload = False
312
    is_directory = False
313
    generic = False
314
315
    @cached_property
316
    def widgets(self):
317
        '''
318
        List widgets with filter return True for this file (or without filter).
319
320
        Entry link is prepended.
321
        Download button is prepended if :property:can_download returns true.
322
        Remove button is prepended if :property:can_remove returns true.
323
324
        :returns: list of widgets
325
        :rtype: list of namedtuple instances
326
        '''
327
        widgets = [
328
            self.plugin_manager.create_widget(
329
                'entry-link',
330
                'link',
331
                file=self,
332
                endpoint='open'
333
                )
334
            ]
335
        if self.can_download:
336
            widgets.append(
337
                self.plugin_manager.create_widget(
338
                    'entry-actions',
339
                    'button',
340
                    file=self,
341
                    css='download',
342
                    endpoint='download_file'
343
                    )
344
                )
345
        return widgets + super(File, self).widgets
346
347
    @cached_property
348
    def mimetype(self):
349
        '''
350
        Get full mimetype, with encoding if available.
351
352
        :returns: mimetype
353
        :rtype: str
354
        '''
355
        return self.plugin_manager.get_mimetype(self.path)
356
357
    @cached_property
358
    def is_file(self):
359
        '''
360
        Get if node is file.
361
362
        :returns: True if file, False otherwise
363
        :rtype: bool
364
        '''
365
        return os.path.isfile(self.path)
366
367
    @property
368
    def size(self):
369
        '''
370
        Get human-readable node size in bytes.
371
        If directory, this will corresponds with own inode size.
372
373
        :returns: fuzzy size with unit
374
        :rtype: str
375
        '''
376
        size, unit = fmt_size(
377
            self.stats.st_size,
378
            self.app.config["use_binary_multiples"]
379
            )
380
        if unit == binary_units[0]:
381
            return "%d %s" % (size, unit)
382
        return "%.2f %s" % (size, unit)
383
384
    @property
385
    def encoding(self):
386
        '''
387
        Get encoding part of mimetype, or "default" if not available.
388
389
        :returns: file conding as returned by mimetype function or "default"
390
        :rtype: str
391
        '''
392
        if ";" in self.mimetype:
393
            match = self.re_charset.search(self.mimetype)
394
            gdict = match.groupdict() if match else {}
395
            return gdict.get("charset") or "default"
396
        return "default"
397
398
    def remove(self):
399
        '''
400
        Remove file.
401
        :raises OutsideRemovableBase: when not under removable base directory
402
        '''
403
        super(File, self).remove()
404
        os.unlink(self.path)
405
406
    def download(self):
407
        '''
408
        Get a Flask's send_file Response object pointing to this file.
409
410
        :returns: Response object as returned by flask's send_file
411
        :rtype: flask.Response
412
        '''
413
        directory, name = os.path.split(self.path)
414
        return send_from_directory(directory, name, as_attachment=True)
415
416
417
@Node.register_directory_class
418
class Directory(Node):
419
    '''
420
    Filesystem directory class.
421
422
    Some notes:
423
424
    * :attr:`mimetype` is fixed to 'inode/directory', so mimetype detection
425
      functions won't be called in this case.
426
    * :attr:`is_file` is fixed to False, so no further checks are needed.
427
    * :attr:`size` is fixed to 0 (zero), so stats are not required for this.
428
    * :attr:`encoding` is fixed to 'default'.
429
    * :attr:`generic` is set to False, so static method :meth:`from_urlpath`
430
      will always return instances of this class.
431
    '''
432
    _listdir_cache = None
433
    mimetype = 'inode/directory'
434
    is_file = False
435
    size = 0
436
    encoding = 'default'
437
    generic = False
438
439
    @property
440
    def name(self):
441
        '''
442
        Get the basename portion of directory's path.
443
444
        :returns: filename
445
        :rtype: str
446
        '''
447
        return super(Directory, self).name or self.path
448
449
    @cached_property
450
    def widgets(self):
451
        '''
452
        List widgets with filter return True for this dir (or without filter).
453
454
        Entry link is prepended.
455
        Upload scripts and widget are added if :property:can_upload is true.
456
        Download button is prepended if :property:can_download returns true.
457
        Remove button is prepended if :property:can_remove returns true.
458
459
        :returns: list of widgets
460
        :rtype: list of namedtuple instances
461
        '''
462
        widgets = [
463
            self.plugin_manager.create_widget(
464
                'entry-link',
465
                'link',
466
                file=self,
467
                endpoint='browse'
468
                )
469
            ]
470
        if self.can_upload:
471
            widgets.extend((
472
                self.plugin_manager.create_widget(
473
                    'head',
474
                    'script',
475
                    file=self,
476
                    endpoint='static',
477
                    filename='browse.directory.head.js'
478
                ),
479
                self.plugin_manager.create_widget(
480
                    'scripts',
481
                    'script',
482
                    file=self,
483
                    endpoint='static',
484
                    filename='browse.directory.body.js'
485
                ),
486
                self.plugin_manager.create_widget(
487
                    'header',
488
                    'upload',
489
                    file=self,
490
                    text='Upload',
491
                    endpoint='upload'
492
                    )
493
                ))
494
        if self.can_download:
495
            widgets.append(
496
                self.plugin_manager.create_widget(
497
                    'entry-actions',
498
                    'button',
499
                    file=self,
500
                    css='download',
501
                    endpoint='download_directory'
502
                    )
503
                )
504
        return widgets + super(Directory, self).widgets
505
506
    @cached_property
507
    def is_directory(self):
508
        '''
509
        Get if path points to a real directory.
510
511
        :returns: True if real directory, False otherwise
512
        :rtype: bool
513
        '''
514
        return os.path.isdir(self.path)
515
516
    @cached_property
517
    def can_download(self):
518
        '''
519
        Get if path is downloadable (if app's `directory_downloadable` config
520
        property is True).
521
522
        :returns: True if downloadable, False otherwise
523
        :rtype: bool
524
        '''
525
        return self.app.config['directory_downloadable']
526
527
    @cached_property
528
    def can_upload(self):
529
        '''
530
        Get if a file can be uploaded to path (if directory path is under app's
531
        `directory_upload` config property).
532
533
        :returns: True if a file can be upload to directory, False otherwise
534
        :rtype: bool
535
        '''
536
        dirbase = self.app.config["directory_upload"]
537
        return dirbase and check_base(self.path, dirbase)
538
539
    @cached_property
540
    def is_empty(self):
541
        '''
542
        Get if directory is empty (based on :meth:`_listdir`).
543
544
        :returns: True if this directory has no entries, False otherwise.
545
        :rtype: bool
546
        '''
547
        if self._listdir_cache is not None:
548
            return bool(self._listdir_cache)
549
        for entry in self._listdir():
550
            return False
551
        return True
552
553
    def remove(self):
554
        '''
555
        Remove directory tree.
556
557
        :raises OutsideRemovableBase: when not under removable base directory
558
        '''
559
        super(Directory, self).remove()
560
        shutil.rmtree(self.path)
561
562
    def download(self):
563
        '''
564
        Get a Flask Response object streaming a tarball of this directory.
565
566
        :returns: Response object
567
        :rtype: flask.Response
568
        '''
569
        return self.app.response_class(
570
            TarFileStream(
571
                self.path,
572
                self.app.config["directory_tar_buffsize"]
573
                ),
574
            mimetype="application/octet-stream"
575
            )
576
577
    def contains(self, filename):
578
        '''
579
        Check if directory contains an entry with given filename.
580
581
        :param filename: filename will be check
582
        :type filename: str
583
        :returns: True if exists, False otherwise.
584
        :rtype: bool
585
        '''
586
        return os.path.exists(os.path.join(self.path, filename))
587
588
    def choose_filename(self, filename, attempts=999):
589
        '''
590
        Get a new filename which does not colide with any entry on directory,
591
        based on given filename.
592
593
        :param filename: base filename
594
        :type filename: str
595
        :param attempts: number of attempts, defaults to 999
596
        :type attempts: int
597
        :returns: filename
598
        :rtype: str
599
        '''
600
        new_filename = filename
601
        for attempt in range(2, attempts + 1):
602
            if not self.contains(new_filename):
603
                return new_filename
604
            new_filename = alternative_filename(filename, attempt)
605
        while self.contains(new_filename):
606
            new_filename = alternative_filename(filename)
607
        return new_filename
608
609
    def _listdir(self, precomputed_stats=os.name == 'nt'):
610
        '''
611
        Iter unsorted entries on this directory.
612
613
        :yields: Directory or File instance for each entry in directory
614
        :ytype: Node
615
        '''
616
        for entry in compat.scandir(self.path):
617
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
618
            try:
619
                if precomputed_stats and not entry.is_symlink():
620
                    kwargs['stats'] = entry.stat()
621
                if entry.is_dir(follow_symlinks=True):
622
                    yield self.directory_class(**kwargs)
623
                    continue
624
                yield self.file_class(**kwargs)
625
            except OSError as e:
626
                logger.exception(e)
627
628
    def listdir(self, sortkey=None, reverse=False):
629
        '''
630
        Get sorted list (by given sortkey and reverse params) of File objects.
631
632
        :return: sorted list of File instances
633
        :rtype: list of File
634
        '''
635
        if self._listdir_cache is None:
636
            if sortkey:
637
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
638
            elif reverse:
639
                data = list(reversed(self._listdir()))
640
            else:
641
                data = list(self._listdir())
642
            self._listdir_cache = data
643
        return self._listdir_cache
644
645
646
class TarFileStream(object):
647
    '''
648
    Tarfile which compresses while reading for streaming.
649
650
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
651
    compression.
652
653
    Note on corroutines: this class uses threading by default, but
654
    corroutine-based applications can change this behavior overriding the
655
    :attr:`event_class` and :attr:`thread_class` values.
656
    '''
657
    event_class = threading.Event
658
    thread_class = threading.Thread
659
    tarfile_class = tarfile.open
660
661
    def __init__(self, path, buffsize=10240):
662
        '''
663
        Internal tarfile object will be created, and compression will start
664
        on a thread until buffer became full with writes becoming locked until
665
        a read occurs.
666
667
        :param path: local path of directory whose content will be compressed.
668
        :type path: str
669
        :param buffsize: size of internal buffer on bytes, defaults to 10KiB
670
        :type buffsize: int
671
        '''
672
        self.path = path
673
        self.name = os.path.basename(path) + ".tgz"
674
675
        self._finished = 0
676
        self._want = 0
677
        self._data = bytes()
678
        self._add = self.event_class()
679
        self._result = self.event_class()
680
        self._tarfile = self.tarfile_class(  # stream write
681
            fileobj=self,
682
            mode="w|gz",
683
            bufsize=buffsize
684
            )
685
        self._th = self.thread_class(target=self.fill)
686
        self._th.start()
687
688
    def fill(self):
689
        '''
690
        Writes data on internal tarfile instance, which writes to current
691
        object, using :meth:`write`.
692
693
        As this method is blocking, it is used inside a thread.
694
695
        This method is called automatically, on a thread, on initialization,
696
        so there is little need to call it manually.
697
        '''
698
        self._tarfile.add(self.path, "")
699
        self._tarfile.close()  # force stream flush
700
        self._finished += 1
701
        if not self._result.is_set():
702
            self._result.set()
703
704
    def write(self, data):
705
        '''
706
        Write method used by internal tarfile instance to output data.
707
        This method blocks tarfile execution once internal buffer is full.
708
709
        As this method is blocking, it is used inside the same thread of
710
        :meth:`fill`.
711
712
        :param data: bytes to write to internal buffer
713
        :type data: bytes
714
        :returns: number of bytes written
715
        :rtype: int
716
        '''
717
        self._add.wait()
718
        self._data += data
719
        if len(self._data) > self._want:
720
            self._add.clear()
721
            self._result.set()
722
        return len(data)
723
724
    def read(self, want=0):
725
        '''
726
        Read method, gets data from internal buffer while releasing
727
        :meth:`write` locks when needed.
728
729
        The lock usage means it must ran on a different thread than
730
        :meth:`fill`, ie. the main thread, otherwise will deadlock.
731
732
        The combination of both write and this method running on different
733
        threads makes tarfile being streamed on-the-fly, with data chunks being
734
        processed and retrieved on demand.
735
736
        :param want: number bytes to read, defaults to 0 (all available)
737
        :type want: int
738
        :returns: tarfile data as bytes
739
        :rtype: bytes
740
        '''
741
        if self._finished:
742
            if self._finished == 1:
743
                self._finished += 1
744
                return ""
745
            return EOFError("EOF reached")
746
747
        # Thread communication
748
        self._want = want
749
        self._add.set()
750
        self._result.wait()
751
        self._result.clear()
752
753
        if want:
754
            data = self._data[:want]
755
            self._data = self._data[want:]
756
        else:
757
            data = self._data
758
            self._data = bytes()
759
        return data
760
761
    def __iter__(self):
762
        '''
763
        Iterate through tarfile result chunks.
764
765
        Similarly to :meth:`read`, this methos must ran on a different thread
766
        than :meth:`write` calls.
767
768
        :yields: data chunks as taken from :meth:`read`.
769
        :ytype: bytes
770
        '''
771
        data = self.read()
772
        while data:
773
            yield data
774
            data = self.read()
775
776
777
class OutsideDirectoryBase(Exception):
778
    '''
779
    Exception thrown when trying to access to a file outside path defined on
780
    `directory_base` config property.
781
    '''
782
    pass
783
784
785
class OutsideRemovableBase(Exception):
786
    '''
787
    Exception thrown when trying to access to a file outside path defined on
788
    `directory_remove` config property.
789
    '''
790
    pass
791
792
793
def fmt_size(size, binary=True):
794
    '''
795
    Get size and unit.
796
797
    :param size: size in bytes
798
    :param binary: whether use binary or standard units, defaults to True
799
    :return: size and unit
800
    :rtype: tuple of int and unit as str
801
    '''
802
    if binary:
803
        fmt_sizes = binary_units
804
        fmt_divider = 1024.
805
    else:
806
        fmt_sizes = standard_units
807
        fmt_divider = 1000.
808
    for fmt in fmt_sizes[:-1]:
809
        if size < 1000:
810
            return (size, fmt)
811
        size /= fmt_divider
812
    return size, fmt_sizes[-1]
813
814
815
def relativize_path(path, base, os_sep=os.sep):
816
    '''
817
    Make absolute path relative to an absolute base.
818
819
    :param path: absolute path
820
    :param base: absolute base path
821
    :param os_sep: path component separator, defaults to current OS separator
822
    :return: relative path
823
    :rtype: str or unicode
824
    :raises OutsideDirectoryBase: if path is not below base
825
    '''
826
    if not check_base(path, base, os_sep):
827
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
828
    prefix_len = len(base)
829
    if not base.endswith(os_sep):
830
        prefix_len += len(os_sep)
831
    return path[prefix_len:]
832
833
834
def abspath_to_urlpath(path, base, os_sep=os.sep):
835
    '''
836
    Make filesystem absolute path uri relative using given absolute base path.
837
838
    :param path: absolute path
839
    :param base: absolute base path
840
    :param os_sep: path component separator, defaults to current OS separator
841
    :return: relative uri
842
    :rtype: str or unicode
843
    :raises OutsideDirectoryBase: if resulting path is not below base
844
    '''
845
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
846
847
848
def urlpath_to_abspath(path, base, os_sep=os.sep):
849
    '''
850
    Make uri relative path fs absolute using a given absolute base path.
851
852
    :param path: relative path
853
    :param base: absolute base path
854
    :param os_sep: path component separator, defaults to current OS separator
855
    :return: absolute path
856
    :rtype: str or unicode
857
    :raises OutsideDirectoryBase: if resulting path is not below base
858
    '''
859
    prefix = base if base.endswith(os_sep) else base + os_sep
860
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
861
    if base == realpath or realpath.startswith(prefix):
862
        return realpath
863
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
864
865
866
def generic_filename(path):
867
    '''
868
    Extract filename of given path os-indepently, taking care of known path
869
    separators.
870
871
    :param path: path
872
    :return: filename
873
    :rtype: str or unicode (depending on given path)
874
    '''
875
876
    for sep in common_path_separators:
877
        if sep in path:
878
            _, path = path.rsplit(sep, 1)
879
    return path
880
881
882
def clean_restricted_chars(path, restricted_chars=restricted_chars):
883
    '''
884
    Get path without restricted characters.
885
886
    :param path: path
887
    :return: path without restricted characters
888
    :rtype: str or unicode (depending on given path)
889
    '''
890
    for character in restricted_chars:
891
        path = path.replace(character, '_')
892
    return path
893
894
895
def check_forbidden_filename(filename,
896
                             destiny_os=os.name,
897
                             restricted_names=restricted_names):
898
    '''
899
    Get if given filename is forbidden for current OS or filesystem.
900
901
    :param filename:
902
    :param destiny_os: destination operative system
903
    :param fs_encoding: destination filesystem filename encoding
904
    :return: wether is forbidden on given OS (or filesystem) or not
905
    :rtype: bool
906
    '''
907
    if destiny_os == 'nt':
908
        fpc = filename.split('.', 1)[0].upper()
909
        if fpc in nt_device_names:
910
            return True
911
912
    return filename in restricted_names
913
914
915
def check_base(path, base, os_sep=os.sep):
916
    '''
917
    Check if given absolute path is under or given base.
918
919
    :param path: absolute path
920
    :param base: absolute base path
921
    :return: wether file is under given base or not
922
    :rtype: bool
923
    '''
924
    base = base[:-len(os_sep)] if base.endswith(os_sep) else base
925
    return path == base or check_under_base(path, base, os_sep)
926
927
928
def check_under_base(path, base, os_sep=os.sep):
929
    '''
930
    Check if given absolute path is under given base.
931
932
    :param path: absolute path
933
    :param base: absolute base path
934
    :return: wether file is under given base or not
935
    :rtype: bool
936
    '''
937
    prefix = base if base.endswith(os_sep) else base + os_sep
938
    return path.startswith(prefix)
939
940
941
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.FS_ENCODING):
942
    '''
943
    Get rid of parent path components and special filenames.
944
945
    If path is invalid or protected, return empty string.
946
947
    :param path: unsafe path
948
    :type: str
949
    :param destiny_os: destination operative system
950
    :type destiny_os: str
951
    :return: filename or empty string
952
    :rtype: str
953
    '''
954
    path = generic_filename(path)
955
    path = clean_restricted_chars(path)
956
957
    if check_forbidden_filename(path, destiny_os=destiny_os):
958
        return ''
959
960
    if isinstance(path, bytes):
961
        path = path.decode('latin-1', errors=underscore_replace)
962
963
    # Decode and recover from filesystem encoding in order to strip unwanted
964
    # characters out
965
    kwargs = dict(
966
        os_name=destiny_os,
967
        fs_encoding=fs_encoding,
968
        errors=underscore_replace
969
        )
970
    fs_encoded_path = compat.fsencode(path, **kwargs)
971
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
972
    return fs_decoded_path
973
974
975
def alternative_filename(filename, attempt=None):
976
    '''
977
    Generates an alternative version of given filename.
978
979
    If an number attempt parameter is given, will be used on the alternative
980
    name, a random value will be used otherwise.
981
982
    :param filename: original filename
983
    :param attempt: optional attempt number, defaults to null
984
    :return: new filename
985
    :rtype: str or unicode
986
    '''
987
    filename_parts = filename.rsplit(u'.', 2)
988
    name = filename_parts[0]
989
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
990
    if attempt is None:
991
        choose = random.choice
992
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
993
    else:
994
        extra = u' (%d)' % attempt
995
    return u'%s%s%s' % (name, extra, ext)
996