Passed
Pull Request — master (#1083)
by Konstantin
02:44
created

ocrd_utils.os.get_moduledir()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'guess_media_type',
12
    'pushd_popd',
13
    'unzip_file_to_dir',
14
    'atomic_write',
15
    'redirect_stderr_and_stdout_to_file',
16
]
17
18
from tempfile import TemporaryDirectory, gettempdir
19
from functools import lru_cache
20
from contextlib import contextmanager, redirect_stderr, redirect_stdout
21
from distutils.spawn import find_executable as which
22
from json import loads
23
from json.decoder import JSONDecodeError
24
from os import getcwd, chdir, stat, chmod, umask, environ
25
from pathlib import Path
26
from os.path import exists, abspath as abspath_, join, isdir
27
from zipfile import ZipFile
28
from subprocess import run, PIPE
29
from mimetypes import guess_type as mimetypes_guess
30
from filetype import guess as filetype_guess
31
32
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
33
34
from .constants import EXT_TO_MIME
35
from .config import config
36
from .logging import getLogger
37
38
def abspath(url):
39
    """
40
    Get a full path to a file or file URL
41
42
    See os.abspath
43
    """
44
    if url.startswith('file://'):
45
        url = url[len('file://'):]
46
    return abspath_(url)
47
48
@contextmanager
49
def pushd_popd(newcwd=None, tempdir=False):
50
    if newcwd and tempdir:
51
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
52
    try:
53
        oldcwd = getcwd()
54
    except FileNotFoundError:
55
        # This happens when a directory is deleted before the context is exited
56
        oldcwd = gettempdir()
57
    try:
58
        if tempdir:
59
            with TemporaryDirectory() as tempcwd:
60
                chdir(tempcwd)
61
                yield Path(tempcwd).resolve()
62
        else:
63
            if newcwd:
64
                chdir(newcwd)
65
            yield Path(newcwd).resolve()
66
    finally:
67
        chdir(oldcwd)
68
69
def unzip_file_to_dir(path_to_zip, output_directory):
70
    """
71
    Extract a ZIP archive to a directory
72
    """
73
    z = ZipFile(path_to_zip, 'r')
74
    z.extractall(output_directory)
75
    z.close()
76
77
@lru_cache()
78
def get_ocrd_tool_json(executable):
79
    """
80
    Get the ``ocrd-tool`` description of ``executable``.
81
    """
82
    executable_name = Path(executable).name
83
    try:
84
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
85
    except (JSONDecodeError, OSError) as e:
86
        getLogger('ocrd.utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
87
        ocrd_tool = {}
88
    if 'resource_locations' not in ocrd_tool:
89
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
90
    return ocrd_tool
91
92
@lru_cache()
93
def get_moduledir(executable):
94
    moduledir = None
95
    try:
96
        moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE).stdout.rstrip('\n')
97
    except (JSONDecodeError, OSError) as e:
98
        getLogger('ocrd.utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
99
    return moduledir
100
101
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
102
    """
103
    Generate candidates for processor resources according to
104
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
105
    """
106
    candidates = []
107
    candidates.append(join(cwd, fname))
108
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
109
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
110
    if processor_path_var in environ:
111
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
112
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
113
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
114
    if moduled:
115
        candidates.append(join(moduled, fname))
116
    return candidates
117
118
def list_all_resources(executable, moduled=None, xdg_data_home=None):
119
    """
120
    List all processor resources in the filesystem according to
121
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
122
    """
123
    candidates = []
124
    try:
125
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
126
    except FileNotFoundError:
127
        # processor we're looking for ressource_locations of is not installed.
128
        # Assume the default
129
        resource_locations = ['data', 'cwd', 'system', 'module']
130
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
131
    # XXX cwd would list too many false positives
132
    # if 'cwd' in resource_locations:
133
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
134
    #     if Path(cwd_candidate).exists():
135
    #         candidates.append(cwd_candidate)
136
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
137
    if processor_path_var in environ:
138
        for processor_path in environ[processor_path_var].split(':'):
139
            if Path(processor_path).is_dir():
140
                candidates += Path(processor_path).iterdir()
141
    if 'data' in resource_locations:
142
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
143
        if datadir.is_dir():
144
            candidates += datadir.iterdir()
145
    if 'system' in resource_locations:
146
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
147
        if systemdir.is_dir():
148
            candidates += systemdir.iterdir()
149
    if 'module' in resource_locations and moduled:
150
        # recurse fully
151
        for resource in itertree(Path(moduled)):
152
            if resource.is_dir():
153
                continue
154
            if any(resource.match(pattern) for pattern in
155
                   # Python distributions do not distinguish between
156
                   # code and data; `is_resource()` only singles out
157
                   # files over directories; but we want data files only
158
                   # todo: more code and cache exclusion patterns!
159
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
160
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
161
                continue
162
            candidates.append(resource)
163
    # recurse once
164
    for parent in candidates:
165
        if parent.is_dir() and parent.name != '.git':
166
            candidates += parent.iterdir()
167
    return sorted([str(x) for x in candidates])
168
169
def get_processor_resource_types(executable, ocrd_tool=None):
170
    """
171
    Determine what type of resource parameters a processor needs.
172
173
    Return a list of MIME types (with the special value `*/*` to
174
    designate that arbitrary files or directories are allowed).
175
    """
176
    if not ocrd_tool:
177
        # if the processor in question is not installed, assume both files and directories
178
        if not which(executable):
179
            return ['*/*']
180
        ocrd_tool = get_ocrd_tool_json(executable)
181
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
182
        # None of the parameters for this processor are resources (or not
183
        # the resource parametrs are not properly declared, so output both
184
        # directories and files
185
        return ['*/*']
186
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
187
            if 'content-type' in p]
188
189
# ht @pabs3
190
# https://github.com/untitaker/python-atomicwrites/issues/42
191
class AtomicWriterPerms(AtomicWriter):
192
    def get_fileobject(self, **kwargs):
193
        f = super().get_fileobject(**kwargs)
194
        try:
195
            mode = stat(self._path).st_mode
196
        except FileNotFoundError:
197
            # Creating a new file, emulate what os.open() does
198
            mask = umask(0)
199
            umask(mask)
200
            mode = 0o664 & ~mask
201
        fd = f.fileno()
202
        chmod(fd, mode)
203
        return f
204
205
@contextmanager
206
def atomic_write(fpath):
207
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
208
        yield f
209
210
211
def is_file_in_directory(directory, file):
212
    """
213
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
214
    """
215
    directory = Path(directory)
216
    file = Path(file)
217
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
218
219
def itertree(path):
220
    """
221
    Generate a list of paths by recursively enumerating ``path``
222
    """
223
    if not isinstance(path, Path):
224
        path = Path(path)
225
    if path.is_dir():
226
        for subpath in path.iterdir():
227
            yield from itertree(subpath)
228
    yield path
229
230
def directory_size(path):
231
    """
232
    Calculcates size of all files in directory ``path``
233
    """
234
    path = Path(path)
235
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
236
237
def guess_media_type(input_file : str, fallback : str = None, application_xml : str = 'application/xml'):
238
    """
239
    Guess the media type of a file path
240
    """
241
    mimetype = filetype_guess(input_file)
242
    if mimetype is not None:
243
        mimetype = mimetype.mime
244
    else:
245
        mimetype = mimetypes_guess(input_file)[0]
246
    if mimetype is None:
247
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
248
    if mimetype is None:
249
        raise ValueError("Could not determine MIME type of input_file must")
250
    if mimetype == 'application/xml':
251
        mimetype = application_xml
252
    return mimetype
253
254
@contextmanager
255
def redirect_stderr_and_stdout_to_file(filename):
256
    with open(filename, 'at', encoding='utf-8') as f:
257
        with redirect_stderr(f), redirect_stdout(f):
258
            yield
259