Passed
Pull Request — master (#792)
by Konstantin
02:08
created

ocrd_utils.os.itertree()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 4
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'is_file_in_directory',
7
    'get_processor_resource_types',
8
    'pushd_popd',
9
    'unzip_file_to_dir',
10
    'atomic_write',
11
]
12
13
from tempfile import TemporaryDirectory
14
import contextlib
15
from distutils.spawn import find_executable as which
16
from json import loads
17
from os import getcwd, chdir, stat, chmod, umask, environ
18
from pathlib import Path
19
from os.path import exists, abspath as abspath_, join, isdir
20
from zipfile import ZipFile
21
from subprocess import run, PIPE
22
23
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
24
25
from .constants import XDG_DATA_HOME
26
27
def abspath(url):
28
    """
29
    Get a full path to a file or file URL
30
31
    See os.abspath
32
    """
33
    if url.startswith('file://'):
34
        url = url[len('file://'):]
35
    return abspath_(url)
36
37
@contextlib.contextmanager
38
def pushd_popd(newcwd=None, tempdir=False):
39
    if newcwd and tempdir:
40
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
41
    try:
42
        oldcwd = getcwd()
43
    except FileNotFoundError:
44
        # This happens when a directory is deleted before the context is exited
45
        oldcwd = '/tmp'
46
    try:
47
        if tempdir:
48
            with TemporaryDirectory() as tempcwd:
49
                chdir(tempcwd)
50
                yield tempcwd
51
        else:
52
            if newcwd:
53
                chdir(newcwd)
54
            yield newcwd
55
    finally:
56
        chdir(oldcwd)
57
58
def unzip_file_to_dir(path_to_zip, output_directory):
59
    """
60
    Extract a ZIP archive to a directory
61
    """
62
    z = ZipFile(path_to_zip, 'r')
63
    z.extractall(output_directory)
64
    z.close()
65
66
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
67
    """
68
    Generate candidates for processor resources according to
69
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
70
    """
71
    candidates = []
72
    candidates.append(join(cwd, fname))
73
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
74
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
75
    if processor_path_var in environ:
76
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
77
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
78
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
79
    if moduled:
80
        candidates.append(join(moduled, fname))
81
    return candidates
82
83
def list_all_resources(executable, moduled=None, xdg_data_home=None):
84
    """
85
    List all processor resources in the filesystem according to
86
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
87
    """
88
    candidates = []
89
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
90
    # XXX cwd would list too many false positives
91
    # cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
92
    # if Path(cwd_candidate).exists():
93
    #     candidates.append(cwd_candidate)
94
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
95
    if processor_path_var in environ:
96
        for processor_path in environ[processor_path_var].split(':'):
97
            if Path(processor_path).is_dir():
98
                candidates += Path(processor_path).iterdir()
99
    datadir = Path(xdg_data_home, 'ocrd-resources', executable)
100
    if datadir.is_dir():
101
        candidates += datadir.iterdir()
102
    systemdir = Path('/usr/local/share/ocrd-resources', executable)
103
    if systemdir.is_dir():
104
        candidates += systemdir.iterdir()
105
    if moduled:
106
        # recurse fully
107
        for resource in itertree(Path(moduled)):
108
            if resource.is_dir():
109
                continue
110
            if any(resource.match(pattern) for pattern in
111
                   # Python distributions do not distinguish between
112
                   # code and data; `is_resource()` only singles out
113
                   # files over directories; but we want data files only
114
                   # todo: more code and cache exclusion patterns!
115
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json']):
116
                continue
117
            candidates.append(resource)
118
    # recurse once
119
    for parent in candidates:
120
        if parent.is_dir() and parent.name != '.git':
121
            candidates += parent.iterdir()
122
    return sorted([str(x) for x in candidates])
123
124
def get_processor_resource_types(executable, ocrd_tool=None):
125
    """
126
    Determine what type of resource parameters a processor needs.
127
128
    Return a list of MIME types (with the special value `*/*` to
129
    designate that arbitrary files or directories are allowed).
130
    """
131
    if not ocrd_tool:
132
        # if the processor in question is not installed, assume both files and directories
133
        if not which(executable):
134
            return ['*/*']
135
        result = run([executable, '--dump-json'], stdout=PIPE, check=True, universal_newlines=True)
136
        ocrd_tool = loads(result.stdout)
137
    if not next((True for p in ocrd_tool['parameters'].values() if 'content-type' in p), False):
138
        # None of the parameters for this processor are resources (or not
139
        # the resource parametrs are not properly declared, so output both
140
        # directories and files
141
        return ['*/*']
142
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
143
            if 'content-type' in p]
144
145
# ht @pabs3
146
# https://github.com/untitaker/python-atomicwrites/issues/42
147
class AtomicWriterPerms(AtomicWriter):
148
    def get_fileobject(self, **kwargs):
149
        f = super().get_fileobject(**kwargs)
150
        try:
151
            mode = stat(self._path).st_mode
152
        except FileNotFoundError:
153
            # Creating a new file, emulate what os.open() does
154
            mask = umask(0)
155
            umask(mask)
156
            mode = 0o664 & ~mask
157
        fd = f.fileno()
158
        chmod(fd, mode)
159
        return f
160
161
@contextlib.contextmanager
162
def atomic_write(fpath):
163
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
164
        yield f
165
166
167
def is_file_in_directory(directory, file):
168
    """
169
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
170
    """
171
    directory = Path(directory)
172
    file = Path(file)
173
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
174
175
def itertree(path):
176
    """
177
    Generate a list of paths by recursively enumerating ``path``
178
    """
179
    if not isinstance(path, Path):
180
        path = Path(path)
181
    if path.is_dir():
182
        for subpath in path.iterdir():
183
            yield from itertree(subpath)
184
    yield path
185