Passed
Pull Request — master (#800)
by Konstantin
02:30
created

ocrd_utils.os.itertree()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 4
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_processor_resource_types',
10
    'pushd_popd',
11
    'unzip_file_to_dir',
12
    'atomic_write',
13
]
14
15
from tempfile import TemporaryDirectory
16
from functools import lru_cache
17
import contextlib
18
from distutils.spawn import find_executable as which
19
from json import loads
20
from json.decoder import JSONDecodeError
21
from os import getcwd, chdir, stat, chmod, umask, environ
22
from pathlib import Path
23
from os.path import exists, abspath as abspath_, join, isdir
24
from zipfile import ZipFile
25
from subprocess import run, PIPE
26
27
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
28
29
from .constants import XDG_DATA_HOME
30
31
def abspath(url):
32
    """
33
    Get a full path to a file or file URL
34
35
    See os.abspath
36
    """
37
    if url.startswith('file://'):
38
        url = url[len('file://'):]
39
    return abspath_(url)
40
41
@contextlib.contextmanager
42
def pushd_popd(newcwd=None, tempdir=False):
43
    if newcwd and tempdir:
44
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
45
    try:
46
        oldcwd = getcwd()
47
    except FileNotFoundError:
48
        # This happens when a directory is deleted before the context is exited
49
        oldcwd = '/tmp'
50
    try:
51
        if tempdir:
52
            with TemporaryDirectory() as tempcwd:
53
                chdir(tempcwd)
54
                yield tempcwd
55
        else:
56
            if newcwd:
57
                chdir(newcwd)
58
            yield newcwd
59
    finally:
60
        chdir(oldcwd)
61
62
def unzip_file_to_dir(path_to_zip, output_directory):
63
    """
64
    Extract a ZIP archive to a directory
65
    """
66
    z = ZipFile(path_to_zip, 'r')
67
    z.extractall(output_directory)
68
    z.close()
69
70
@lru_cache()
71
def get_ocrd_tool_json(executable):
72
    """
73
    Get the ``ocrd-tool`` description of ``executable``.
74
    """
75
    executable_name = Path(executable).name
76
    ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
77
    if 'resource_locations' not in ocrd_tool:
78
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
79
    return ocrd_tool
80
81
# def get_processor_resources_list(executable):
82
#     """
83
#     Get the list of resources that a processor is providing via
84
#     ``-list-resources``
85
#     """
86
87
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
88
    """
89
    Generate candidates for processor resources according to
90
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
91
    """
92
    candidates = []
93
    candidates.append(join(cwd, fname))
94
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
95
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
96
    if processor_path_var in environ:
97
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
98
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
99
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
100
    if moduled:
101
        candidates.append(join(moduled, fname))
102
    return candidates
103
104
def list_all_resources(executable, moduled=None, xdg_data_home=None):
105
    """
106
    List all processor resources in the filesystem according to
107
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
108
    """
109
    candidates = []
110
    try:
111
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
112
    except FileNotFoundError:
113
        # processor we're looking for ressource_locations of is not installed.
114
        # Assume the default
115
        resource_locations = ['data', 'cwd', 'system', 'module']
116
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
117
    # XXX cwd would list too many false positives
118
    # if 'cwd' in resource_locations:
119
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
120
    #     if Path(cwd_candidate).exists():
121
    #         candidates.append(cwd_candidate)
122
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
123
    if processor_path_var in environ:
124
        for processor_path in environ[processor_path_var].split(':'):
125
            if Path(processor_path).is_dir():
126
                candidates += Path(processor_path).iterdir()
127
    if 'data' in resource_locations:
128
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
129
        if datadir.is_dir():
130
            candidates += datadir.iterdir()
131
    if 'system' in resource_locations:
132
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
133
        if systemdir.is_dir():
134
            candidates += systemdir.iterdir()
135
    if 'module' in resource_locations and moduled:
136
        # recurse fully
137
        for resource in itertree(Path(moduled)):
138
            if resource.is_dir():
139
                continue
140
            if any(resource.match(pattern) for pattern in
141
                   # Python distributions do not distinguish between
142
                   # code and data; `is_resource()` only singles out
143
                   # files over directories; but we want data files only
144
                   # todo: more code and cache exclusion patterns!
145
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json']):
146
                continue
147
            candidates.append(resource)
148
    # recurse once
149
    for parent in candidates:
150
        if parent.is_dir() and parent.name != '.git':
151
            candidates += parent.iterdir()
152
    return sorted([str(x) for x in candidates])
153
154
def get_processor_resource_types(executable, ocrd_tool=None):
155
    """
156
    Determine what type of resource parameters a processor needs.
157
158
    Return a list of MIME types (with the special value `*/*` to
159
    designate that arbitrary files or directories are allowed).
160
    """
161
    if not ocrd_tool:
162
        # if the processor in question is not installed, assume both files and directories
163
        if not which(executable):
164
            return ['*/*']
165
        ocrd_tool = get_ocrd_tool_json(executable)
166
    if not next((True for p in ocrd_tool['parameters'].values() if 'content-type' in p), False):
167
        # None of the parameters for this processor are resources (or not
168
        # the resource parametrs are not properly declared, so output both
169
        # directories and files
170
        return ['*/*']
171
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
172
            if 'content-type' in p]
173
174
# ht @pabs3
175
# https://github.com/untitaker/python-atomicwrites/issues/42
176
class AtomicWriterPerms(AtomicWriter):
177
    def get_fileobject(self, **kwargs):
178
        f = super().get_fileobject(**kwargs)
179
        try:
180
            mode = stat(self._path).st_mode
181
        except FileNotFoundError:
182
            # Creating a new file, emulate what os.open() does
183
            mask = umask(0)
184
            umask(mask)
185
            mode = 0o664 & ~mask
186
        fd = f.fileno()
187
        chmod(fd, mode)
188
        return f
189
190
@contextlib.contextmanager
191
def atomic_write(fpath):
192
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
193
        yield f
194
195
196
def is_file_in_directory(directory, file):
197
    """
198
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
199
    """
200
    directory = Path(directory)
201
    file = Path(file)
202
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
203
204
def itertree(path):
205
    """
206
    Generate a list of paths by recursively enumerating ``path``
207
    """
208
    if not isinstance(path, Path):
209
        path = Path(path)
210
    if path.is_dir():
211
        for subpath in path.iterdir():
212
            yield from itertree(subpath)
213
    yield path
214
215
def directory_size(path):
216
    """
217
    Calculcates size of all files in directory ``path``
218
    """
219
    path = Path(path)
220
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
221