ocrd_utils.os.atomic_write()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'guess_media_type',
12
    'pushd_popd',
13
    'unzip_file_to_dir',
14
    'atomic_write',
15
    'redirect_stderr_and_stdout_to_file',
16
]
17
18
from tempfile import TemporaryDirectory, gettempdir
19
from functools import lru_cache
20
from contextlib import contextmanager, redirect_stderr, redirect_stdout
21
from shutil import which
22
from json import loads
23
from json.decoder import JSONDecodeError
24
from os import getcwd, chdir, stat, chmod, umask, environ
25
from pathlib import Path
26
from os.path import abspath as abspath_, join
27
from zipfile import ZipFile
28
from subprocess import run, PIPE
29
from mimetypes import guess_type as mimetypes_guess
30
from filetype import guess as filetype_guess
31
32
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
33
34
from .constants import EXT_TO_MIME
35
from .config import config
36
from .logging import getLogger
37
from .introspect import resource_string
38
39
def abspath(url):
40
    """
41
    Get a full path to a file or file URL
42
43
    See os.abspath
44
    """
45
    if url.startswith('file://'):
46
        url = url[len('file://'):]
47
    return abspath_(url)
48
49
@contextmanager
50
def pushd_popd(newcwd=None, tempdir=False):
51
    if newcwd and tempdir:
52
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
53
    try:
54
        oldcwd = getcwd()
55
    except FileNotFoundError:
56
        # This happens when a directory is deleted before the context is exited
57
        oldcwd = gettempdir()
58
    try:
59
        if tempdir:
60
            with TemporaryDirectory() as tempcwd:
61
                chdir(tempcwd)
62
                yield Path(tempcwd).resolve()
63
        else:
64
            if newcwd:
65
                chdir(newcwd)
66
            yield Path(newcwd).resolve()
67
    finally:
68
        chdir(oldcwd)
69
70
def unzip_file_to_dir(path_to_zip, output_directory):
71
    """
72
    Extract a ZIP archive to a directory
73
    """
74
    with ZipFile(path_to_zip, 'r') as z:
75
        z.extractall(output_directory)
76
77
@lru_cache()
78
def get_ocrd_tool_json(executable):
79
    """
80
    Get the ``ocrd-tool`` description of ``executable``.
81
    """
82
    ocrd_tool = {}
83
    executable_name = Path(executable).name
84
    try:
85
        ocrd_all_tool = loads(resource_string('ocrd', 'ocrd-all-tool.json'))
86
        ocrd_tool = ocrd_all_tool[executable]
87
    except (JSONDecodeError, OSError, KeyError):
88
        try:
89
            ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE, check=False).stdout)
90
        except (JSONDecodeError, OSError) as e:
91
            getLogger('ocrd.utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
92
    if 'resource_locations' not in ocrd_tool:
93
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
94
    return ocrd_tool
95
96
@lru_cache()
97
def get_moduledir(executable):
98
    moduledir = None
99
    try:
100
        ocrd_all_moduledir = loads(resource_string('ocrd', 'ocrd-all-module-dir.json'))
101
        moduledir = ocrd_all_moduledir[executable]
102
    except (JSONDecodeError, OSError, KeyError):
103
        try:
104
            moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE, check=False).stdout.rstrip('\n')
105
        except (JSONDecodeError, OSError) as e:
106
            getLogger('ocrd.utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
107
    return moduledir
108
109
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
110
    """
111
    Generate candidates for processor resources according to
112
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
113
    """
114
    candidates = []
115
    candidates.append(join(cwd, fname))
116
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
117
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
118
    if processor_path_var in environ:
119
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
120
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
121
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
122
    if moduled:
123
        candidates.append(join(moduled, fname))
124
    return candidates
125
126
def list_all_resources(executable, moduled=None, xdg_data_home=None):
127
    """
128
    List all processor resources in the filesystem according to
129
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
130
    """
131
    candidates = []
132
    try:
133
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
134
    except FileNotFoundError:
135
        # processor we're looking for resource_locations of is not installed.
136
        # Assume the default
137
        resource_locations = ['data', 'cwd', 'system', 'module']
138
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
139
    # XXX cwd would list too many false positives
140
    # if 'cwd' in resource_locations:
141
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
142
    #     if Path(cwd_candidate).exists():
143
    #         candidates.append(cwd_candidate)
144
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
145
    if processor_path_var in environ:
146
        for processor_path in environ[processor_path_var].split(':'):
147
            if Path(processor_path).is_dir():
148
                candidates += Path(processor_path).iterdir()
149
    if 'data' in resource_locations:
150
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
151
        if datadir.is_dir():
152
            candidates += datadir.iterdir()
153
    if 'system' in resource_locations:
154
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
155
        if systemdir.is_dir():
156
            candidates += systemdir.iterdir()
157
    if 'module' in resource_locations and moduled:
158
        # recurse fully
159
        for resource in itertree(Path(moduled)):
160
            if resource.is_dir():
161
                continue
162
            if any(resource.match(pattern) for pattern in
163
                   # Python distributions do not distinguish between
164
                   # code and data; `is_resource()` only singles out
165
                   # files over directories; but we want data files only
166
                   # todo: more code and cache exclusion patterns!
167
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
168
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
169
                continue
170
            candidates.append(resource)
171
    # recurse once
172
    for parent in candidates:
173
        if parent.is_dir() and parent.name != '.git':
174
            candidates += parent.iterdir()
175
    return sorted([str(x) for x in candidates])
176
177
def get_processor_resource_types(executable, ocrd_tool=None):
178
    """
179
    Determine what type of resource parameters a processor needs.
180
181
    Return a list of MIME types (with the special value `*/*` to
182
    designate that arbitrary files or directories are allowed).
183
    """
184
    if not ocrd_tool:
185
        # if the processor in question is not installed, assume both files and directories
186
        if not which(executable):
187
            return ['*/*']
188
        ocrd_tool = get_ocrd_tool_json(executable)
189
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
190
        # None of the parameters for this processor are resources (or not
191
        # the resource parameters are not properly declared, so output both
192
        # directories and files
193
        return ['*/*']
194
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
195
            if 'content-type' in p]
196
197
# ht @pabs3
198
# https://github.com/untitaker/python-atomicwrites/issues/42
199
class AtomicWriterPerms(AtomicWriter):
200
    def get_fileobject(self, **kwargs):
201
        f = super().get_fileobject(**kwargs)
202
        try:
203
            mode = stat(self._path).st_mode
204
        except FileNotFoundError:
205
            # Creating a new file, emulate what os.open() does
206
            mask = umask(0)
207
            umask(mask)
208
            mode = 0o664 & ~mask
209
        fd = f.fileno()
210
        chmod(fd, mode)
211
        return f
212
213
@contextmanager
214
def atomic_write(fpath):
215
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
216
        yield f
217
218
219
def is_file_in_directory(directory, file):
220
    """
221
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
222
    """
223
    directory = Path(directory)
224
    file = Path(file)
225
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
226
227
def itertree(path):
228
    """
229
    Generate a list of paths by recursively enumerating ``path``
230
    """
231
    if not isinstance(path, Path):
232
        path = Path(path)
233
    if path.is_dir():
234
        for subpath in path.iterdir():
235
            yield from itertree(subpath)
236
    yield path
237
238
def directory_size(path):
239
    """
240
    Calculates size of all files in directory ``path``
241
    """
242
    path = Path(path)
243
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
244
245
def guess_media_type(input_file : str, fallback : str = None, application_xml : str = 'application/xml'):
246
    """
247
    Guess the media type of a file path
248
    """
249
    mimetype = filetype_guess(input_file)
250
    if mimetype is not None:
251
        mimetype = mimetype.mime
252
    else:
253
        mimetype = mimetypes_guess(input_file)[0]
254
    if mimetype is None:
255
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
256
    if mimetype is None:
257
        raise ValueError("Could not determine MIME type of input_file must")
258
    if mimetype == 'application/xml':
259
        mimetype = application_xml
260
    return mimetype
261
262
@contextmanager
263
def redirect_stderr_and_stdout_to_file(filename):
264
    with open(filename, 'at', encoding='utf-8') as f:
265
        with redirect_stderr(f), redirect_stdout(f):
266
            yield
267