Passed
Pull Request — master (#800)
by Konstantin
02:17
created

ocrd_utils.os.abspath()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_processor_resource_types',
10
    'pushd_popd',
11
    'unzip_file_to_dir',
12
    'atomic_write',
13
]
14
15
from tempfile import TemporaryDirectory
16
from functools import lru_cache
17
import contextlib
18
from distutils.spawn import find_executable as which
19
from json import loads
20
from json.decoder import JSONDecodeError
21
from os import getcwd, chdir, stat, chmod, umask, environ
22
from pathlib import Path
23
from os.path import exists, abspath as abspath_, join, isdir
24
from zipfile import ZipFile
25
from subprocess import run, PIPE
26
27
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
28
29
from .constants import XDG_DATA_HOME
30
31
def abspath(url):
32
    """
33
    Get a full path to a file or file URL
34
35
    See os.abspath
36
    """
37
    if url.startswith('file://'):
38
        url = url[len('file://'):]
39
    return abspath_(url)
40
41
@contextlib.contextmanager
42
def pushd_popd(newcwd=None, tempdir=False):
43
    if newcwd and tempdir:
44
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
45
    try:
46
        oldcwd = getcwd()
47
    except FileNotFoundError:
48
        # This happens when a directory is deleted before the context is exited
49
        oldcwd = '/tmp'
50
    try:
51
        if tempdir:
52
            with TemporaryDirectory() as tempcwd:
53
                chdir(tempcwd)
54
                yield tempcwd
55
        else:
56
            if newcwd:
57
                chdir(newcwd)
58
            yield newcwd
59
    finally:
60
        chdir(oldcwd)
61
62
def unzip_file_to_dir(path_to_zip, output_directory):
63
    """
64
    Extract a ZIP archive to a directory
65
    """
66
    z = ZipFile(path_to_zip, 'r')
67
    z.extractall(output_directory)
68
    z.close()
69
70
# XXX Tue Feb 22 19:23:35 CET 2022 caching disabled as it interferes with testing 
71
# @lru_cache()
72
def get_ocrd_tool_json(executable):
73
    """
74
    Get the ``ocrd-tool`` description of ``executable``.
75
    """
76
    executable_name = Path(executable).name
77
    try:
78
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
79
    # except FileNotFoundError:
80
    #     ocrd_tool = {}
81
    except JSONDecodeError:
82
        ocrd_tool = {}
83
    if 'resource_locations' not in ocrd_tool:
84
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
85
    return ocrd_tool
86
87
# def get_processor_resources_list(executable):
88
#     """
89
#     Get the list of resources that a processor is providing via
90
#     ``-list-resources``
91
#     """
92
93
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
94
    """
95
    Generate candidates for processor resources according to
96
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
97
    """
98
    candidates = []
99
    candidates.append(join(cwd, fname))
100
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
101
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
102
    if processor_path_var in environ:
103
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
104
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
105
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
106
    if moduled:
107
        candidates.append(join(moduled, fname))
108
    return candidates
109
110
def list_all_resources(executable, moduled=None, xdg_data_home=None):
111
    """
112
    List all processor resources in the filesystem according to
113
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
114
    """
115
    candidates = []
116
    resource_locations = get_ocrd_tool_json(executable)['resource_locations']
117
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
118
    # XXX cwd would list too many false positives
119
    # if 'cwd' in resource_locations:
120
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
121
    #     if Path(cwd_candidate).exists():
122
    #         candidates.append(cwd_candidate)
123
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
124
    if processor_path_var in environ:
125
        for processor_path in environ[processor_path_var].split(':'):
126
            if Path(processor_path).is_dir():
127
                candidates += Path(processor_path).iterdir()
128
    if 'data' in resource_locations:
129
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
130
        if datadir.is_dir():
131
            candidates += datadir.iterdir()
132
    if 'system' in resource_locations:
133
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
134
        if systemdir.is_dir():
135
            candidates += systemdir.iterdir()
136
    if 'module' in resource_locations and moduled:
137
        # recurse fully
138
        for resource in itertree(Path(moduled)):
139
            if resource.is_dir():
140
                continue
141
            if any(resource.match(pattern) for pattern in
142
                   # Python distributions do not distinguish between
143
                   # code and data; `is_resource()` only singles out
144
                   # files over directories; but we want data files only
145
                   # todo: more code and cache exclusion patterns!
146
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json']):
147
                continue
148
            candidates.append(resource)
149
    # recurse once
150
    for parent in candidates:
151
        if parent.is_dir() and parent.name != '.git':
152
            candidates += parent.iterdir()
153
    return sorted([str(x) for x in candidates])
154
155
def get_processor_resource_types(executable, ocrd_tool=None):
156
    """
157
    Determine what type of resource parameters a processor needs.
158
159
    Return a list of MIME types (with the special value `*/*` to
160
    designate that arbitrary files or directories are allowed).
161
    """
162
    if not ocrd_tool:
163
        # if the processor in question is not installed, assume both files and directories
164
        if not which(executable):
165
            return ['*/*']
166
        ocrd_tool = get_ocrd_tool_json(executable)
167
    if not next((True for p in ocrd_tool['parameters'].values() if 'content-type' in p), False):
168
        # None of the parameters for this processor are resources (or not
169
        # the resource parametrs are not properly declared, so output both
170
        # directories and files
171
        return ['*/*']
172
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
173
            if 'content-type' in p]
174
175
# ht @pabs3
176
# https://github.com/untitaker/python-atomicwrites/issues/42
177
class AtomicWriterPerms(AtomicWriter):
178
    def get_fileobject(self, **kwargs):
179
        f = super().get_fileobject(**kwargs)
180
        try:
181
            mode = stat(self._path).st_mode
182
        except FileNotFoundError:
183
            # Creating a new file, emulate what os.open() does
184
            mask = umask(0)
185
            umask(mask)
186
            mode = 0o664 & ~mask
187
        fd = f.fileno()
188
        chmod(fd, mode)
189
        return f
190
191
@contextlib.contextmanager
192
def atomic_write(fpath):
193
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
194
        yield f
195
196
197
def is_file_in_directory(directory, file):
198
    """
199
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
200
    """
201
    directory = Path(directory)
202
    file = Path(file)
203
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
204
205
def itertree(path):
206
    """
207
    Generate a list of paths by recursively enumerating ``path``
208
    """
209
    if not isinstance(path, Path):
210
        path = Path(path)
211
    if path.is_dir():
212
        for subpath in path.iterdir():
213
            yield from itertree(subpath)
214
    yield path
215
216
def directory_size(path):
217
    """
218
    Calculcates size of all files in directory ``path``
219
    """
220
    path = Path(path)
221
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
222