Passed
Push — master ( 1eaf93...e54143 )
by Konstantin
02:29 queued 31s
created

ocrd_utils.os.AtomicWriterPerms.get_fileobject()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 12
rs 9.85
c 0
b 0
f 0
cc 2
nop 2
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'pushd_popd',
7
    'unzip_file_to_dir',
8
    'atomic_write',
9
]
10
11
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
12
from tempfile import TemporaryDirectory
13
import contextlib
14
from os import getcwd, chdir, stat, fchmod, umask
15
from os.path import exists, abspath as abspath_
16
17
from zipfile import ZipFile
18
19
def abspath(url):
20
    """
21
    Get a full path to a file or file URL
22
23
    See os.abspath
24
    """
25
    if url.startswith('file://'):
26
        url = url[len('file://'):]
27
    return abspath_(url)
28
29
@contextlib.contextmanager
30
def pushd_popd(newcwd=None, tempdir=False):
31
    if newcwd and tempdir:
32
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
33
    try:
34
        oldcwd = getcwd()
35
    except FileNotFoundError as e:  # pylint: disable=unused-variable
36
        # This happens when a directory is deleted before the context is exited
37
        oldcwd = '/tmp'
38
    try:
39
        if tempdir:
40
            with TemporaryDirectory() as tempcwd:
41
                chdir(tempcwd)
42
                yield tempcwd
43
        else:
44
            if newcwd:
45
                chdir(newcwd)
46
            yield newcwd
47
    finally:
48
        chdir(oldcwd)
49
50
def unzip_file_to_dir(path_to_zip, output_directory):
51
    """
52
    Extract a ZIP archive to a directory
53
    """
54
    z = ZipFile(path_to_zip, 'r')
55
    z.extractall(output_directory)
56
    z.close()
57
58
59
60
# ht @pabs3
61
# https://github.com/untitaker/python-atomicwrites/issues/42
62
class AtomicWriterPerms(AtomicWriter):
63
    def get_fileobject(self, **kwargs):
64
        f = super().get_fileobject(**kwargs)
65
        try:
66
            mode = stat(self._path).st_mode
67
        except FileNotFoundError:
68
            # Creating a new file, emulate what os.open() does
69
            mask = umask(0)
70
            umask(mask)
71
            mode = 0o664 & ~mask
72
        fd = f.fileno()
73
        fchmod(fd, mode)
74
        return f
75
76
@contextlib.contextmanager
77
def atomic_write(fpath):
78
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
79
        yield f
80