Completed
Push — 0.5.3 ( 2f400b )
by Felipe A.
01:07
created

check_base()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import re
7
import shutil
8
import codecs
9
import threading
10
import string
11
import tarfile
12
import random
13
import datetime
14
import logging
15
16
from flask import current_app, send_from_directory
17
from werkzeug.utils import cached_property
18
19
from . import compat
20
from .compat import range
21
22
23
logger = logging.getLogger(__name__)
24
unicode_underscore = '_'.decode('utf-8') if compat.PY_LEGACY else '_'
25
underscore_replace = '%s:underscore' % __name__
26
codecs.register_error(underscore_replace,
27
                      lambda error: (unicode_underscore, error.start + 1)
28
                      )
29
binary_units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
30
standard_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
31
common_path_separators = '\\/'
32
restricted_chars = '\\/\0'
33
restricted_names = ('.', '..', '::', os.sep)
34
nt_device_names = ('CON', 'AUX', 'COM1', 'COM2', 'COM3', 'COM4', 'LPT1',
35
                   'LPT2', 'LPT3', 'PRN', 'NUL')
36
fs_safe_characters = string.ascii_uppercase + string.digits
37
38
39
class Node(object):
40
    '''
41
    Abstract filesystem node class.
42
43
    This represents an unspecified entity with a filesystem's path suitable for
44
    being inherited by plugins.
45
46
    When inheriting, the following attributes should be overwritten in order
47
    to specify :meth:`from_urlpath` classmethod behavior:
48
49
    * :attr:`generic`, if true, an instance of directory_class or file_class
50
      will be created instead of an instance of this class tself.
51
    * :attr:`directory_class`, class will be used for directory nodes,
52
    * :attr:`file_class`, class will be used for file nodes.
53
    '''
54
    generic = True
55
    directory_class = None  # set later at import time
56
    file_class = None  # set later at import time
57
58
    re_charset = re.compile('; charset=(?P<charset>[^;]+)')
59
    can_download = False
60
61
    @cached_property
62
    def plugin_manager(self):
63
        '''
64
        Get current app's plugin manager.
65
66
        :returns: plugin manager instance
67
        '''
68
        return self.app.extensions['plugin_manager']
69
70
    @cached_property
71
    def widgets(self):
72
        '''
73
        List widgets with filter return True for this node (or without filter).
74
75
        Remove button is prepended if :property:can_remove returns true.
76
77
        :returns: list of widgets
78
        :rtype: list of namedtuple instances
79
        '''
80
        widgets = []
81
        if self.can_remove:
82
            widgets.append(
83
                self.plugin_manager.create_widget(
84
                    'entry-actions',
85
                    'button',
86
                    file=self,
87
                    css='remove',
88
                    endpoint='remove'
89
                    )
90
                )
91
        return widgets + self.plugin_manager.get_widgets(file=self)
92
93
    @cached_property
94
    def link(self):
95
        '''
96
        Get last widget with place "entry-link".
97
98
        :returns: widget on entry-link (ideally a link one)
99
        :rtype: namedtuple instance
100
        '''
101
        link = None
102
        for widget in self.widgets:
103
            if widget.place == 'entry-link':
104
                link = widget
105
        return link
106
107
    @cached_property
108
    def can_remove(self):
109
        '''
110
        Get if current node can be removed based on app config's
111
        directory_remove.
112
113
        :returns: True if current node can be removed, False otherwise.
114
        :rtype: bool
115
        '''
116
        dirbase = self.app.config["directory_remove"]
117
        return dirbase and check_under_base(self.path, dirbase)
118
119
    @cached_property
120
    def stats(self):
121
        '''
122
        Get current stats object as returned by os.stat function.
123
124
        :returns: stats object
125
        :rtype: posix.stat_result or nt.stat_result
126
        '''
127
        return os.stat(self.path)
128
129
    @cached_property
130
    def parent(self):
131
        '''
132
        Get parent node if available based on app config's directory_base.
133
134
        :returns: parent object if available
135
        :rtype: Node instance or None
136
        '''
137
        if self.path == self.app.config['directory_base']:
138
            return None
139
        parent = os.path.dirname(self.path) if self.path else None
140
        return self.directory_class(parent, self.app) if parent else None
141
142
    @cached_property
143
    def ancestors(self):
144
        '''
145
        Get list of ancestors until app config's directory_base is reached.
146
147
        :returns: list of ancestors starting from nearest.
148
        :rtype: list of Node objects
149
        '''
150
        ancestors = []
151
        parent = self.parent
152
        while parent:
153
            ancestors.append(parent)
154
            parent = parent.parent
155
        return ancestors
156
157
    @property
158
    def modified(self):
159
        '''
160
        Get human-readable last modification date-time.
161
162
        :returns: iso9008-like date-time string (without timezone)
163
        :rtype: str
164
        '''
165
        dt = datetime.datetime.fromtimestamp(self.stats.st_mtime)
166
        return dt.strftime('%Y.%m.%d %H:%M:%S')
167
168
    @property
169
    def urlpath(self):
170
        '''
171
        Get the url substring corresponding to this node for those endpoints
172
        accepting a 'path' parameter, suitable for :meth:`from_urlpath`.
173
174
        :returns: relative-url-like for node's path
175
        :rtype: str
176
        '''
177
        return abspath_to_urlpath(self.path, self.app.config['directory_base'])
178
179
    @property
180
    def name(self):
181
        '''
182
        Get the basename portion of node's path.
183
184
        :returns: filename
185
        :rtype: str
186
        '''
187
        return os.path.basename(self.path)
188
189
    @property
190
    def type(self):
191
        '''
192
        Get the mime portion of node's mimetype (without the encoding part).
193
194
        :returns: mimetype
195
        :rtype: str
196
        '''
197
        return self.mimetype.split(";", 1)[0]
198
199
    @property
200
    def category(self):
201
        '''
202
        Get mimetype category (first portion of mimetype before the slash).
203
204
        :returns: mimetype category
205
        :rtype: str
206
207
        As of 2016-11-03's revision of RFC2046 it could be one of the
208
        following:
209
            * application
210
            * audio
211
            * example
212
            * image
213
            * message
214
            * model
215
            * multipart
216
            * text
217
            * video
218
        '''
219
        return self.type.split('/', 1)[0]
220
221
    def __init__(self, path=None, app=None, **defaults):
222
        '''
223
        :param path: local path
224
        :type path: str
225
        :param path: optional app instance
226
        :type path: flask.app
227
        :param **defaults: attributes will be set to object
228
        '''
229
        self.path = compat.fsdecode(path) if path else None
230
        self.app = current_app if app is None else app
231
        self.__dict__.update(defaults)  # only for attr and cached_property
232
233
    def remove(self):
234
        '''
235
        Does nothing except raising if can_remove property returns False.
236
237
        :raises: OutsideRemovableBase if :property:can_remove returns false
238
        '''
239
        if not self.can_remove:
240
            raise OutsideRemovableBase("File outside removable base")
241
242
    @classmethod
243
    def from_urlpath(cls, path, app=None):
244
        '''
245
        Alternative constructor which accepts a path as taken from URL and uses
246
        the given app or the current app config to get the real path.
247
248
        If class has attribute `generic` set to True, `directory_class` or
249
        `file_class` will be used as type.
250
251
        :param path: relative path as from URL
252
        :param app: optional, flask application
253
        :return: file object pointing to path
254
        :rtype: File
255
        '''
256
        app = app or current_app
257
        base = app.config['directory_base']
258
        path = urlpath_to_abspath(path, base)
259
        if not cls.generic:
260
            kls = cls
261
        elif os.path.isdir(path):
262
            kls = cls.directory_class
263
        else:
264
            kls = cls.file_class
265
        return kls(path=path, app=app)
266
267
    @classmethod
268
    def register_file_class(cls, kls):
269
        '''
270
        Convenience method for setting current class file_class property.
271
272
        :param kls: class to set
273
        :type kls: type
274
        :returns: given class (enabling using this as decorator)
275
        :rtype: type
276
        '''
277
        cls.file_class = kls
278
        return kls
279
280
    @classmethod
281
    def register_directory_class(cls, kls):
282
        '''
283
        Convenience method for setting current class directory_class property.
284
285
        :param kls: class to set
286
        :type kls: type
287
        :returns: given class (enabling using this as decorator)
288
        :rtype: type
289
        '''
290
        cls.directory_class = kls
291
        return kls
292
293
294
@Node.register_file_class
295
class File(Node):
296
    '''
297
    Filesystem file class.
298
299
    Some notes:
300
301
    * :attr:`can_download` is fixed to True, so Files can be downloaded
302
      inconditionaly.
303
    * :attr:`can_upload` is fixed to False, so nothing can be uploaded to
304
      file path.
305
    * :attr:`is_directory` is fixed to False, so no further checks are
306
      performed.
307
    * :attr:`generic` is set to False, so static method :meth:`from_urlpath`
308
      will always return instances of this class.
309
    '''
310
    can_download = True
311
    can_upload = False
312
    is_directory = False
313
    generic = False
314
315
    @cached_property
316
    def widgets(self):
317
        '''
318
        List widgets with filter return True for this file (or without filter).
319
320
        Entry link is prepended.
321
        Download button is prepended if :property:can_download returns true.
322
        Remove button is prepended if :property:can_remove returns true.
323
324
        :returns: list of widgets
325
        :rtype: list of namedtuple instances
326
        '''
327
        widgets = [
328
            self.plugin_manager.create_widget(
329
                'entry-link',
330
                'link',
331
                file=self,
332
                endpoint='open'
333
                )
334
            ]
335
        if self.can_download:
336
            widgets.append(
337
                self.plugin_manager.create_widget(
338
                    'entry-actions',
339
                    'button',
340
                    file=self,
341
                    css='download',
342
                    endpoint='download_file'
343
                    )
344
                )
345
        return widgets + super(File, self).widgets
346
347
    @cached_property
348
    def mimetype(self):
349
        '''
350
        Get full mimetype, with encoding if available.
351
352
        :returns: mimetype
353
        :rtype: str
354
        '''
355
        return self.plugin_manager.get_mimetype(self.path)
356
357
    @cached_property
358
    def is_file(self):
359
        '''
360
        Get if node is file.
361
362
        :returns: True if file, False otherwise
363
        :rtype: bool
364
        '''
365
        return os.path.isfile(self.path)
366
367
    @property
368
    def size(self):
369
        '''
370
        Get human-readable node size in bytes.
371
        If directory, this will corresponds with own inode size.
372
373
        :returns: fuzzy size with unit
374
        :rtype: str
375
        '''
376
        size, unit = fmt_size(
377
            self.stats.st_size,
378
            self.app.config["use_binary_multiples"]
379
            )
380
        if unit == binary_units[0]:
381
            return "%d %s" % (size, unit)
382
        return "%.2f %s" % (size, unit)
383
384
    @property
385
    def encoding(self):
386
        '''
387
        Get encoding part of mimetype, or "default" if not available.
388
389
        :returns: file conding as returned by mimetype function or "default"
390
        :rtype: str
391
        '''
392
        if ";" in self.mimetype:
393
            match = self.re_charset.search(self.mimetype)
394
            gdict = match.groupdict() if match else {}
395
            return gdict.get("charset") or "default"
396
        return "default"
397
398
    def remove(self):
399
        '''
400
        Remove file.
401
        :raises OutsideRemovableBase: when not under removable base directory
402
        '''
403
        super(File, self).remove()
404
        os.unlink(self.path)
405
406
    def download(self):
407
        '''
408
        Get a Flask's send_file Response object pointing to this file.
409
410
        :returns: Response object as returned by flask's send_file
411
        :rtype: flask.Response
412
        '''
413
        directory, name = os.path.split(self.path)
414
        return send_from_directory(directory, name, as_attachment=True)
415
416
417
@Node.register_directory_class
418
class Directory(Node):
419
    '''
420
    Filesystem directory class.
421
422
    Some notes:
423
424
    * :attr:`mimetype` is fixed to 'inode/directory', so mimetype detection
425
      functions won't be called in this case.
426
    * :attr:`is_file` is fixed to False, so no further checks are needed.
427
    * :attr:`size` is fixed to 0 (zero), so stats are not required for this.
428
    * :attr:`encoding` is fixed to 'default'.
429
    * :attr:`generic` is set to False, so static method :meth:`from_urlpath`
430
      will always return instances of this class.
431
    '''
432
    _listdir_cache = None
433
    mimetype = 'inode/directory'
434
    is_file = False
435
    size = 0
436
    encoding = 'default'
437
    generic = False
438
439
    @cached_property
440
    def widgets(self):
441
        '''
442
        List widgets with filter return True for this dir (or without filter).
443
444
        Entry link is prepended.
445
        Upload scripts and widget are added if :property:can_upload is true.
446
        Download button is prepended if :property:can_download returns true.
447
        Remove button is prepended if :property:can_remove returns true.
448
449
        :returns: list of widgets
450
        :rtype: list of namedtuple instances
451
        '''
452
        widgets = [
453
            self.plugin_manager.create_widget(
454
                'entry-link',
455
                'link',
456
                file=self,
457
                endpoint='browse'
458
                )
459
            ]
460
        if self.can_upload:
461
            widgets.extend((
462
                self.plugin_manager.create_widget(
463
                    'head',
464
                    'script',
465
                    file=self,
466
                    endpoint='static',
467
                    filename='browse.directory.head.js'
468
                ),
469
                self.plugin_manager.create_widget(
470
                    'scripts',
471
                    'script',
472
                    file=self,
473
                    endpoint='static',
474
                    filename='browse.directory.body.js'
475
                ),
476
                self.plugin_manager.create_widget(
477
                    'header',
478
                    'upload',
479
                    file=self,
480
                    text='Upload',
481
                    endpoint='upload'
482
                    )
483
                ))
484
        if self.can_download:
485
            widgets.append(
486
                self.plugin_manager.create_widget(
487
                    'entry-actions',
488
                    'button',
489
                    file=self,
490
                    css='download',
491
                    endpoint='download_directory'
492
                    )
493
                )
494
        return widgets + super(Directory, self).widgets
495
496
    @cached_property
497
    def is_directory(self):
498
        '''
499
        Get if path points to a real directory.
500
501
        :returns: True if real directory, False otherwise
502
        :rtype: bool
503
        '''
504
        return os.path.isdir(self.path)
505
506
    @cached_property
507
    def can_download(self):
508
        '''
509
        Get if path is downloadable (if app's `directory_downloadable` config
510
        property is True).
511
512
        :returns: True if downloadable, False otherwise
513
        :rtype: bool
514
        '''
515
        return self.app.config['directory_downloadable']
516
517
    @cached_property
518
    def can_upload(self):
519
        '''
520
        Get if a file can be uploaded to path (if directory path is under app's
521
        `directory_upload` config property).
522
523
        :returns: True if a file can be upload to directory, False otherwise
524
        :rtype: bool
525
        '''
526
        dirbase = self.app.config["directory_upload"]
527
        return dirbase and check_base(self.path, dirbase) 
528
529
    @cached_property
530
    def is_empty(self):
531
        '''
532
        Get if directory is empty (based on :meth:`_listdir`).
533
534
        :returns: True if this directory has no entries, False otherwise.
535
        :rtype: bool
536
        '''
537
        if self._listdir_cache is not None:
538
            return bool(self._listdir_cache)
539
        for entry in self._listdir():
540
            return False
541
        return True
542
543
    def remove(self):
544
        '''
545
        Remove directory tree.
546
547
        :raises OutsideRemovableBase: when not under removable base directory
548
        '''
549
        super(Directory, self).remove()
550
        shutil.rmtree(self.path)
551
552
    def download(self):
553
        '''
554
        Get a Flask Response object streaming a tarball of this directory.
555
556
        :returns: Response object
557
        :rtype: flask.Response
558
        '''
559
        return self.app.response_class(
560
            TarFileStream(
561
                self.path,
562
                self.app.config["directory_tar_buffsize"]
563
                ),
564
            mimetype="application/octet-stream"
565
            )
566
567
    def contains(self, filename):
568
        '''
569
        Check if directory contains an entry with given filename.
570
571
        :param filename: filename will be check
572
        :type filename: str
573
        :returns: True if exists, False otherwise.
574
        :rtype: bool
575
        '''
576
        return os.path.exists(os.path.join(self.path, filename))
577
578
    def choose_filename(self, filename, attempts=999):
579
        '''
580
        Get a new filename which does not colide with any entry on directory,
581
        based on given filename.
582
583
        :param filename: base filename
584
        :type filename: str
585
        :param attempts: number of attempts, defaults to 999
586
        :type attempts: int
587
        :returns: filename
588
        :rtype: str
589
        '''
590
        new_filename = filename
591
        for attempt in range(2, attempts + 1):
592
            if not self.contains(new_filename):
593
                return new_filename
594
            new_filename = alternative_filename(filename, attempt)
595
        while self.contains(new_filename):
596
            new_filename = alternative_filename(filename)
597
        return new_filename
598
599
    def _listdir(self, precomputed_stats=os.name == 'nt'):
600
        '''
601
        Iter unsorted entries on this directory.
602
603
        :yields: Directory or File instance for each entry in directory
604
        :ytype: Node
605
        '''
606
        for entry in compat.scandir(self.path):
607
            kwargs = {'path': entry.path, 'app': self.app, 'parent': self}
608
            if precomputed_stats and not entry.is_symlink():
609
                kwargs['stats'] = entry.stat()
610
            if entry.is_dir(follow_symlinks=True):
611
                yield self.directory_class(**kwargs)
612
                continue
613
            yield self.file_class(**kwargs)
614
615
    def listdir(self, sortkey=None, reverse=False):
616
        '''
617
        Get sorted list (by given sortkey and reverse params) of File objects.
618
619
        :return: sorted list of File instances
620
        :rtype: list of File
621
        '''
622
        if self._listdir_cache is None:
623
            if sortkey:
624
                data = sorted(self._listdir(), key=sortkey, reverse=reverse)
625
            elif reverse:
626
                data = list(reversed(self._listdir()))
627
            else:
628
                data = list(self._listdir())
629
            self._listdir_cache = data
630
        return self._listdir_cache
631
632
633
class TarFileStream(object):
634
    '''
635
    Tarfile which compresses while reading for streaming.
636
637
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
638
    compression.
639
640
    Note on corroutines: this class uses threading by default, but
641
    corroutine-based applications can change this behavior overriding the
642
    :attr:`event_class` and :attr:`thread_class` values.
643
    '''
644
    event_class = threading.Event
645
    thread_class = threading.Thread
646
    tarfile_class = tarfile.open
647
648
    def __init__(self, path, buffsize=10240):
649
        '''
650
        Internal tarfile object will be created, and compression will start
651
        on a thread until buffer became full with writes becoming locked until
652
        a read occurs.
653
654
        :param path: local path of directory whose content will be compressed.
655
        :type path: str
656
        :param buffsize: size of internal buffer on bytes, defaults to 10KiB
657
        :type buffsize: int
658
        '''
659
        self.path = path
660
        self.name = os.path.basename(path) + ".tgz"
661
662
        self._finished = 0
663
        self._want = 0
664
        self._data = bytes()
665
        self._add = self.event_class()
666
        self._result = self.event_class()
667
        self._tarfile = self.tarfile_class(  # stream write
668
            fileobj=self,
669
            mode="w|gz",
670
            bufsize=buffsize
671
            )
672
        self._th = self.thread_class(target=self.fill)
673
        self._th.start()
674
675
    def fill(self):
676
        '''
677
        Writes data on internal tarfile instance, which writes to current
678
        object, using :meth:`write`.
679
680
        As this method is blocking, it is used inside a thread.
681
682
        This method is called automatically, on a thread, on initialization,
683
        so there is little need to call it manually.
684
        '''
685
        self._tarfile.add(self.path, "")
686
        self._tarfile.close()  # force stream flush
687
        self._finished += 1
688
        if not self._result.is_set():
689
            self._result.set()
690
691
    def write(self, data):
692
        '''
693
        Write method used by internal tarfile instance to output data.
694
        This method blocks tarfile execution once internal buffer is full.
695
696
        As this method is blocking, it is used inside the same thread of
697
        :meth:`fill`.
698
699
        :param data: bytes to write to internal buffer
700
        :type data: bytes
701
        :returns: number of bytes written
702
        :rtype: int
703
        '''
704
        self._add.wait()
705
        self._data += data
706
        if len(self._data) > self._want:
707
            self._add.clear()
708
            self._result.set()
709
        return len(data)
710
711
    def read(self, want=0):
712
        '''
713
        Read method, gets data from internal buffer while releasing
714
        :meth:`write` locks when needed.
715
716
        The lock usage means it must ran on a different thread than
717
        :meth:`fill`, ie. the main thread, otherwise will deadlock.
718
719
        The combination of both write and this method running on different
720
        threads makes tarfile being streamed on-the-fly, with data chunks being
721
        processed and retrieved on demand.
722
723
        :param want: number bytes to read, defaults to 0 (all available)
724
        :type want: int
725
        :returns: tarfile data as bytes
726
        :rtype: bytes
727
        '''
728
        if self._finished:
729
            if self._finished == 1:
730
                self._finished += 1
731
                return ""
732
            return EOFError("EOF reached")
733
734
        # Thread communication
735
        self._want = want
736
        self._add.set()
737
        self._result.wait()
738
        self._result.clear()
739
740
        if want:
741
            data = self._data[:want]
742
            self._data = self._data[want:]
743
        else:
744
            data = self._data
745
            self._data = bytes()
746
        return data
747
748
    def __iter__(self):
749
        '''
750
        Iterate through tarfile result chunks.
751
752
        Similarly to :meth:`read`, this methos must ran on a different thread
753
        than :meth:`write` calls.
754
755
        :yields: data chunks as taken from :meth:`read`.
756
        :ytype: bytes
757
        '''
758
        data = self.read()
759
        while data:
760
            yield data
761
            data = self.read()
762
763
764
class OutsideDirectoryBase(Exception):
765
    '''
766
    Exception thrown when trying to access to a file outside path defined on
767
    `directory_base` config property.
768
    '''
769
    pass
770
771
772
class OutsideRemovableBase(Exception):
773
    '''
774
    Exception thrown when trying to access to a file outside path defined on
775
    `directory_remove` config property.
776
    '''
777
    pass
778
779
780
def fmt_size(size, binary=True):
781
    '''
782
    Get size and unit.
783
784
    :param size: size in bytes
785
    :param binary: whether use binary or standard units, defaults to True
786
    :return: size and unit
787
    :rtype: tuple of int and unit as str
788
    '''
789
    if binary:
790
        fmt_sizes = binary_units
791
        fmt_divider = 1024.
792
    else:
793
        fmt_sizes = standard_units
794
        fmt_divider = 1000.
795
    for fmt in fmt_sizes[:-1]:
796
        if size < 1000:
797
            return (size, fmt)
798
        size /= fmt_divider
799
    return size, fmt_sizes[-1]
800
801
802
def relativize_path(path, base, os_sep=os.sep):
803
    '''
804
    Make absolute path relative to an absolute base.
805
806
    :param path: absolute path
807
    :param base: absolute base path
808
    :param os_sep: path component separator, defaults to current OS separator
809
    :return: relative path
810
    :rtype: str or unicode
811
    :raises OutsideDirectoryBase: if path is not below base
812
    '''
813
    if not check_base(path, base, os_sep):
814
        raise OutsideDirectoryBase("%r is not under %r" % (path, base))
815
    prefix_len = len(base)
816
    if not base.endswith(os_sep):
817
        prefix_len += len(os_sep)
818
    return path[prefix_len:]
819
820
821
def abspath_to_urlpath(path, base, os_sep=os.sep):
822
    '''
823
    Make filesystem absolute path uri relative using given absolute base path.
824
825
    :param path: absolute path
826
    :param base: absolute base path
827
    :param os_sep: path component separator, defaults to current OS separator
828
    :return: relative uri
829
    :rtype: str or unicode
830
    :raises OutsideDirectoryBase: if resulting path is not below base
831
    '''
832
    return relativize_path(path, base, os_sep).replace(os_sep, '/')
833
834
835
def urlpath_to_abspath(path, base, os_sep=os.sep):
836
    '''
837
    Make uri relative path fs absolute using a given absolute base path.
838
839
    :param path: relative path
840
    :param base: absolute base path
841
    :param os_sep: path component separator, defaults to current OS separator
842
    :return: absolute path
843
    :rtype: str or unicode
844
    :raises OutsideDirectoryBase: if resulting path is not below base
845
    '''
846
    prefix = base if base.endswith(os_sep) else base + os_sep
847
    realpath = os.path.abspath(prefix + path.replace('/', os_sep))
848
    if base == realpath or realpath.startswith(prefix):
849
        return realpath
850
    raise OutsideDirectoryBase("%r is not under %r" % (realpath, base))
851
852
853
def generic_filename(path):
854
    '''
855
    Extract filename of given path os-indepently, taking care of known path
856
    separators.
857
858
    :param path: path
859
    :return: filename
860
    :rtype: str or unicode (depending on given path)
861
    '''
862
863
    for sep in common_path_separators:
864
        if sep in path:
865
            _, path = path.rsplit(sep, 1)
866
    return path
867
868
869
def clean_restricted_chars(path, restricted_chars=restricted_chars):
870
    '''
871
    Get path without restricted characters.
872
873
    :param path: path
874
    :return: path without restricted characters
875
    :rtype: str or unicode (depending on given path)
876
    '''
877
    for character in restricted_chars:
878
        path = path.replace(character, '_')
879
    return path
880
881
882
def check_forbidden_filename(filename,
883
                             destiny_os=os.name,
884
                             restricted_names=restricted_names):
885
    '''
886
    Get if given filename is forbidden for current OS or filesystem.
887
888
    :param filename:
889
    :param destiny_os: destination operative system
890
    :param fs_encoding: destination filesystem filename encoding
891
    :return: wether is forbidden on given OS (or filesystem) or not
892
    :rtype: bool
893
    '''
894
    if destiny_os == 'nt':
895
        fpc = filename.split('.', 1)[0].upper()
896
        if fpc in nt_device_names:
897
            return True
898
899
    return filename in restricted_names
900
901
902
def check_base(path, base, os_sep=os.sep):
903
    '''
904
    Check if given absolute path is under or given base.
905
906
    :param path: absolute path
907
    :param base: absolute base path
908
    :return: wether file is under given base or not
909
    :rtype: bool
910
    '''
911
    base = base[:-len(os_sep)] if base.endswith(os_sep) else base
912
    return path == base or check_under_base(path, base, os_sep)
913
914
915
def check_under_base(path, base, os_sep=os.sep):
916
    '''
917
    Check if given absolute path is under given base.
918
919
    :param path: absolute path
920
    :param base: absolute base path
921
    :return: wether file is under given base or not
922
    :rtype: bool
923
    '''
924
    prefix = base if base.endswith(os_sep) else base + os_sep
925
    return path.startswith(prefix)
926
927
928
def secure_filename(path, destiny_os=os.name, fs_encoding=compat.FS_ENCODING):
929
    '''
930
    Get rid of parent path components and special filenames.
931
932
    If path is invalid or protected, return empty string.
933
934
    :param path: unsafe path
935
    :type: str
936
    :param destiny_os: destination operative system
937
    :type destiny_os: str
938
    :return: filename or empty string
939
    :rtype: str
940
    '''
941
    path = generic_filename(path)
942
    path = clean_restricted_chars(path)
943
944
    if check_forbidden_filename(path, destiny_os=destiny_os):
945
        return ''
946
947
    if isinstance(path, bytes):
948
        path = path.decode('latin-1', errors=underscore_replace)
949
950
    # Decode and recover from filesystem encoding in order to strip unwanted
951
    # characters out
952
    kwargs = dict(
953
        os_name=destiny_os,
954
        fs_encoding=fs_encoding,
955
        errors=underscore_replace
956
        )
957
    fs_encoded_path = compat.fsencode(path, **kwargs)
958
    fs_decoded_path = compat.fsdecode(fs_encoded_path, **kwargs)
959
    return fs_decoded_path
960
961
962
def alternative_filename(filename, attempt=None):
963
    '''
964
    Generates an alternative version of given filename.
965
966
    If an number attempt parameter is given, will be used on the alternative
967
    name, a random value will be used otherwise.
968
969
    :param filename: original filename
970
    :param attempt: optional attempt number, defaults to null
971
    :return: new filename
972
    :rtype: str or unicode
973
    '''
974
    filename_parts = filename.rsplit(u'.', 2)
975
    name = filename_parts[0]
976
    ext = ''.join(u'.%s' % ext for ext in filename_parts[1:])
977
    if attempt is None:
978
        choose = random.choice
979
        extra = u' %s' % ''.join(choose(fs_safe_characters) for i in range(8))
980
    else:
981
        extra = u' (%d)' % attempt
982
    return u'%s%s%s' % (name, extra, ext)
983