Passed
Pull Request — master (#1319)
by Konstantin
03:56
created

ocrd_utils.os.abspath()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'get_env_locations',
12
    'guess_media_type',
13
    'pushd_popd',
14
    'unzip_file_to_dir',
15
    'atomic_write',
16
    'redirect_stderr_and_stdout_to_file',
17
]
18
19
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
20
from tempfile import TemporaryDirectory, gettempdir
21
from functools import lru_cache
22
from contextlib import contextmanager, redirect_stderr, redirect_stdout
23
from shutil import which
24
from json import loads
25
from json.decoder import JSONDecodeError
26
from os import getcwd, chdir, stat, chmod, umask, environ, PathLike
27
from pathlib import Path
28
from os.path import abspath as abspath_, join
29
from zipfile import ZipFile
30
from subprocess import run, PIPE
31
from mimetypes import guess_type as mimetypes_guess
32
from filetype import guess as filetype_guess
33
from fnmatch import filter as apply_glob
34
35
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
36
37
from .constants import EXT_TO_MIME, MIME_TO_EXT, RESOURCE_LOCATIONS, RESOURCES_DIR_SYSTEM
38
from .config import config
39
from .logging import getLogger
40
from .introspect import resource_string
41
42
def abspath(url : str) -> str:
43
    """
44
    Get a full path to a file or file URL
45
46
    See os.abspath
47
    """
48
    if url.startswith('file://'):
49
        url = url[len('file://'):]
50
    return abspath_(url)
51
52
@contextmanager
53
def pushd_popd(newcwd : Union[str, PathLike] = None, tempdir : bool = False) -> Iterator[PathLike]:
54
    if newcwd and tempdir:
55
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
56
    try:
57
        oldcwd = getcwd()
58
    except FileNotFoundError:
59
        # This happens when a directory is deleted before the context is exited
60
        oldcwd = gettempdir()
61
    try:
62
        if tempdir:
63
            with TemporaryDirectory() as tempcwd:
64
                chdir(tempcwd)
65
                yield Path(tempcwd).resolve()
66
        else:
67
            if newcwd:
68
                chdir(newcwd)
69
            yield Path(newcwd).resolve()
70
    finally:
71
        chdir(oldcwd)
72
73
def unzip_file_to_dir(path_to_zip : Union[str, PathLike], output_directory : str) -> None:
74
    """
75
    Extract a ZIP archive to a directory
76
    """
77
    with ZipFile(path_to_zip, 'r') as z:
78
        z.extractall(output_directory)
79
80
@lru_cache()
81
def get_ocrd_tool_json(executable : str) -> Dict[str, Any]:
82
    """
83
    Get the ``ocrd-tool`` description of ``executable``.
84
    """
85
    ocrd_tool = {}
86
    executable_name = Path(executable).name
87
    try:
88
        ocrd_all_tool = loads(resource_string('ocrd', 'ocrd-all-tool.json'))
89
        ocrd_tool = ocrd_all_tool[executable]
90
    except (JSONDecodeError, OSError, KeyError):
91
        try:
92
            ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE, check=False).stdout)
93
        except (JSONDecodeError, OSError) as e:
94
            getLogger('ocrd.utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
95
    if 'resource_locations' not in ocrd_tool:
96
        ocrd_tool['resource_locations'] = RESOURCE_LOCATIONS
97
    return ocrd_tool
98
99
@lru_cache()
100
def get_moduledir(executable : str) -> str:
101
    moduledir = None
102
    try:
103
        ocrd_all_moduledir = loads(resource_string('ocrd', 'ocrd-all-module-dir.json'))
104
        moduledir = ocrd_all_moduledir[executable]
105
    except (JSONDecodeError, OSError, KeyError):
106
        try:
107
            moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE, check=False).stdout.rstrip('\n')
108
        except (JSONDecodeError, OSError) as e:
109
            getLogger('ocrd.utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
110
    return moduledir
111
112
def get_env_locations(executable: str) -> List[str]:
113
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
114
    if processor_path_var in environ:
115
        return environ[processor_path_var].split(':')
116
    return []
117
118
def list_resource_candidates(executable : str, fname : str, cwd : Optional[str] = None, moduled : Optional[str] = None, xdg_data_home : Optional[str] = None) -> List[str]:
119
    """
120
    Generate candidates for processor resources according to
121
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
122
    """
123
    if cwd is None:
124
        cwd = getcwd()
125
    candidates = []
126
    candidates.append(join(cwd, fname))
127
    xdg_data_home = xdg_data_home or config.XDG_DATA_HOME
128
    for processor_path in get_env_locations(executable):
129
        candidates.append(join(processor_path, fname))
130
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
131
    candidates.append(join(RESOURCES_DIR_SYSTEM, executable, fname))
132
    if moduled:
133
        candidates.append(join(moduled, fname))
134
    return candidates
135
136
def list_all_resources(executable : str, ocrd_tool : Optional[Dict[str, Any]] = None, moduled : Optional[str] = None, xdg_data_home : Optional[str] = None) -> List[str]:
137
    """
138
    List all processor resources in the filesystem according to
139
    https://ocr-d.de/en/spec/ocrd_tool#resource-parameters
140
    """
141
    xdg_data_home = xdg_data_home or config.XDG_DATA_HOME
142
    if ocrd_tool is None:
143
        ocrd_tool = get_ocrd_tool_json(executable)
144
    # processor we're looking for might not be installed, hence the fallbacks
145
    try:
146
        mimetypes = get_processor_resource_types(executable, ocrd_tool=ocrd_tool)
147
    except KeyError:
148
        mimetypes = ['*/*']
149
    try:
150
        resource_locations = ocrd_tool['resource_locations']
151
    except KeyError:
152
        # Assume the default
153
        resource_locations = RESOURCE_LOCATIONS
154
    try:
155
        # fixme: if resources_list contains directories, their "suffix" will interfere
156
        # (e.g. dirname without dot means we falsely match files without suffix)
157
        resource_suffixes = [Path(res['name']).suffix
158
                             for res in ocrd_tool['resources']]
159
    except KeyError:
160
        resource_suffixes = []
161
    logger = getLogger('ocrd.utils.list_all_resources')
162
    candidates = []
163
    # cwd would list too many false positives:
164
    # if 'cwd' in resource_locations:
165
    #     cwddir = Path.cwd()
166
    #     candidates.append(cwddir.itertree())
167
    # but we do not use this anyway:
168
    # relative paths are tried w.r.t. CWD
169
    # prior to list_all_resources resolution.
170
    for processor_path in get_env_locations(executable):
171
        processor_path = Path(processor_path)
172
        if processor_path.is_dir():
173
            candidates += processor_path.iterdir()
174
    if 'data' in resource_locations:
175
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
176
        if datadir.is_dir():
177
            candidates += datadir.iterdir()
178
    if 'system' in resource_locations:
179
        systemdir = Path(RESOURCES_DIR_SYSTEM, executable)
180
        if systemdir.is_dir():
181
            candidates += systemdir.iterdir()
182
    if 'module' in resource_locations and moduled:
183
        # recurse fully
184
        moduled = Path(moduled)
185
        for resource in moduled.iterdir():
186
            if resource.is_dir():
187
                continue
188
            if any(resource.match(pattern) for pattern in
189
                   # Python distributions do not distinguish between
190
                   # code and data; `is_resource()` only singles out
191
                   # files over directories; but we want data files only
192
                   # todo: more code and cache exclusion patterns!
193
                   ['*.py', '*.py[cod]', '*~', '.*.swp', '*.swo',
194
                    '__pycache__/*', '*.egg-info/*', '*.egg',
195
                    'copyright.txt', 'LICENSE*', 'README.md', 'MANIFEST',
196
                    'TAGS', '.DS_Store',
197
                    # C extensions
198
                    '*.so',
199
                    # translations
200
                    '*.mo', '*.pot',
201
                    '*.log', '*.orig', '*.BAK',
202
                    '.git/*',
203
                    # our stuff
204
                    'ocrd-tool.json',
205
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
206
                logger.debug("ignoring module candidate '%s'", resource)
207
                continue
208
            candidates.append(resource)
209
    if mimetypes != ['*/*']:
210
        logger.debug("matching candidates for %s by content-type %s", executable, str(mimetypes))
211
    def valid_resource_type(path):
212
        if '*/*' in mimetypes:
213
            return True
214
        if path.is_dir():
215
            if not 'text/directory' in mimetypes:
216
                logger.debug("ignoring directory candidate '%s'", path)
217
                return False
218
            if path.name in ['.git']:
219
                logger.debug("ignoring directory candidate '%s'", path)
220
                return False
221
            return True
222
        if not path.is_file():
223
            logger.warning("ignoring non-file, non-directory candidate '%s'", path)
224
            return False
225
        res_mimetype = guess_media_type(path, fallback='')
226
        if res_mimetype == 'application/json':
227
            # always accept, regardless of configured mimetypes:
228
            # needed for distributing or sharing parameter preset files
229
            return True
230
        if ['text/directory'] == mimetypes:
231
            logger.debug("ignoring non-directory candidate '%s'", path)
232
            return False
233
        if 'application/octet-stream' in mimetypes:
234
            # catch-all type - do not enforce anything
235
            return True
236
        if path.suffix in resource_suffixes:
237
            return True
238
        if any(path.suffix == MIME_TO_EXT.get(mime, None)
239
               for mime in mimetypes):
240
            return True
241
        if not res_mimetype:
242
            logger.warning("cannot determine content type of candidate '%s'", path)
243
            return True
244
        if any(apply_glob([res_mimetype], mime)
245
               for mime in mimetypes):
246
            return True
247
        logger.debug("ignoring %s candidate '%s'", res_mimetype, path)
248
        return False
249
    candidates = sorted(filter(valid_resource_type, candidates))
250
    return map(str, candidates)
251
252
def get_processor_resource_types(executable : str, ocrd_tool : Optional[Dict[str, Any]] = None) -> List[str]:
253
    """
254
    Determine what type of resource parameters a processor needs.
255
256
    Return a list of MIME types (with the special value `*/*` to
257
    designate that arbitrary files or directories are allowed).
258
    """
259
    if not ocrd_tool:
260
        # if the processor in question is not installed, assume both files and directories
261
        if not which(executable):
262
            return ['*/*']
263
        ocrd_tool = get_ocrd_tool_json(executable)
264
    mime_types = [mime
265
                  for param in ocrd_tool.get('parameters', {}).values()
266
                  if param['type'] == 'string' and param.get('format', '') == 'uri' and 'content-type' in param
267
                  for mime in param['content-type'].split(',')]
268
    if not len(mime_types):
269
        # None of the parameters for this processor are resources
270
        # (or the parameters' resource types are not properly declared,)
271
        # so output both directories and files
272
        return ['*/*']
273
    return mime_types
274
275
# ht @pabs3
276
# https://github.com/untitaker/python-atomicwrites/issues/42
277
class AtomicWriterPerms(AtomicWriter):
278
    def get_fileobject(self, **kwargs):
279
        f = super().get_fileobject(**kwargs)
280
        try:
281
            mode = stat(self._path).st_mode
282
        except FileNotFoundError:
283
            # Creating a new file, emulate what os.open() does
284
            mask = umask(0)
285
            umask(mask)
286
            mode = 0o664 & ~mask
287
        fd = f.fileno()
288
        chmod(fd, mode)
289
        return f
290
291
@contextmanager
292
def atomic_write(fpath : str) -> Iterator[str]:
293
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
294
        yield f
295
296
297
def is_file_in_directory(directory : Union[str, PathLike], file : Union[str, PathLike]) -> bool:
298
    """
299
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
300
    """
301
    directory = Path(directory)
302
    file = Path(file)
303
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
304
305
def itertree(path : Union[str, PathLike]) -> PathLike:
306
    """
307
    Generate a list of paths by recursively enumerating ``path``
308
    """
309
    if not isinstance(path, Path):
310
        path = Path(path)
311
    if path.is_dir():
312
        for subpath in path.iterdir():
313
            yield from itertree(subpath)
314
    yield path
315
316
def directory_size(path : Union[str, PathLike]) -> int:
317
    """
318
    Calculates size of all files in directory ``path``
319
    """
320
    path = Path(path)
321
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
322
323
def guess_media_type(input_file : str, fallback : Optional[str] = None, application_xml : str = 'application/xml') -> str:
324
    """
325
    Guess the media type of a file path
326
    """
327
    mimetype = filetype_guess(input_file)
328
    if mimetype is not None:
329
        mimetype = mimetype.mime
330
    else:
331
        mimetype = mimetypes_guess(input_file)[0]
332
    if mimetype is None:
333
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
334
    if mimetype is None:
335
        raise ValueError("Could not determine MIME type of input_file '%s'", str(input_file))
336
    if mimetype == 'application/xml':
337
        mimetype = application_xml
338
    return mimetype
339
340
@contextmanager
341
def redirect_stderr_and_stdout_to_file(filename):
342
    with open(filename, 'at', encoding='utf-8') as f:
343
        with redirect_stderr(f), redirect_stdout(f):
344
            yield
345