Passed
Pull Request — master (#1319)
by Konstantin
02:11
created

ocrd_utils.os.is_git_url()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'is_git_url',
9
    'get_ocrd_tool_json',
10
    'get_moduledir',
11
    'get_processor_resource_types',
12
    'get_env_locations',
13
    'guess_media_type',
14
    'pushd_popd',
15
    'unzip_file_to_dir',
16
    'atomic_write',
17
    'redirect_stderr_and_stdout_to_file',
18
]
19
20
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
21
from tempfile import TemporaryDirectory, gettempdir
22
from functools import lru_cache
23
from contextlib import contextmanager, redirect_stderr, redirect_stdout
24
from shutil import which
25
from json import loads
26
from json.decoder import JSONDecodeError
27
from os import getcwd, chdir, stat, chmod, umask, environ, PathLike
28
from pathlib import Path
29
from os.path import abspath as abspath_, join
30
from zipfile import ZipFile
31
from subprocess import run, PIPE, CalledProcessError
32
from mimetypes import guess_type as mimetypes_guess
33
from filetype import guess as filetype_guess
34
from fnmatch import filter as apply_glob
35
36
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
37
38
from .constants import EXT_TO_MIME, MIME_TO_EXT, RESOURCE_LOCATIONS, RESOURCES_DIR_SYSTEM
39
from .config import config
40
from .logging import getLogger
41
from .introspect import resource_string
42
43
def abspath(url : str) -> str:
44
    """
45
    Get a full path to a file or file URL
46
47
    See os.abspath
48
    """
49
    if url.startswith('file://'):
50
        url = url[len('file://'):]
51
    return abspath_(url)
52
53
54
@contextmanager
55
def pushd_popd(newcwd : Union[str, PathLike] = None, tempdir : bool = False) -> Iterator[PathLike]:
56
    if newcwd and tempdir:
57
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
58
    try:
59
        oldcwd = getcwd()
60
    except FileNotFoundError:
61
        # This happens when a directory is deleted before the context is exited
62
        oldcwd = gettempdir()
63
    try:
64
        if tempdir:
65
            with TemporaryDirectory() as tempcwd:
66
                chdir(tempcwd)
67
                yield Path(tempcwd).resolve()
68
        else:
69
            if newcwd:
70
                chdir(newcwd)
71
            yield Path(newcwd).resolve()
72
    finally:
73
        chdir(oldcwd)
74
75
def unzip_file_to_dir(path_to_zip : Union[str, PathLike], output_directory : str) -> None:
76
    """
77
    Extract a ZIP archive to a directory
78
    """
79
    with ZipFile(path_to_zip, 'r') as z:
80
        z.extractall(output_directory)
81
82
83
@lru_cache()
84
def is_git_url(url: str) -> bool:
85
    try:
86
        run(['git', 'ls-remote', '--exit-code', '-q', '-h', url], check=True)
87
    except CalledProcessError:
88
        return False
89
    return True
90
91
92
@lru_cache()
93
def get_ocrd_tool_json(executable : str) -> Dict[str, Any]:
94
    """
95
    Get the ``ocrd-tool`` description of ``executable``.
96
    """
97
    ocrd_tool = {}
98
    try:
99
        ocrd_all_tool = loads(resource_string('ocrd', 'ocrd-all-tool.json'))
100
        ocrd_tool = ocrd_all_tool[executable]
101
    except (JSONDecodeError, OSError, KeyError):
102
        try:
103
            ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE, check=False).stdout)
104
        except (JSONDecodeError, OSError) as e:
105
            getLogger('ocrd.utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
106
    if 'resource_locations' not in ocrd_tool:
107
        ocrd_tool['resource_locations'] = RESOURCE_LOCATIONS
108
    return ocrd_tool
109
110
111
@lru_cache()
112
def get_moduledir(executable : str) -> str:
113
    moduledir = None
114
    try:
115
        ocrd_all_moduledir = loads(resource_string('ocrd', 'ocrd-all-module-dir.json'))
116
        moduledir = ocrd_all_moduledir[executable]
117
    except (JSONDecodeError, OSError, KeyError):
118
        try:
119
            moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE, check=False).stdout.rstrip('\n')
120
        except (JSONDecodeError, OSError) as e:
121
            getLogger('ocrd.utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
122
    return moduledir
123
124
def get_env_locations(executable: str) -> List[str]:
125
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
126
    if processor_path_var in environ:
127
        return environ[processor_path_var].split(':')
128
    return []
129
130
def list_resource_candidates(executable : str, fname : str, cwd : Optional[str] = None, moduled : Optional[str] = None, xdg_data_home : Optional[str] = None) -> List[str]:
131
    """
132
    Generate candidates for processor resources according to
133
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
134
    """
135
    if cwd is None:
136
        cwd = getcwd()
137
    candidates = []
138
    candidates.append(join(cwd, fname))
139
    xdg_data_home = xdg_data_home or config.XDG_DATA_HOME
140
    for processor_path in get_env_locations(executable):
141
        candidates.append(join(processor_path, fname))
142
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
143
    candidates.append(join(RESOURCES_DIR_SYSTEM, executable, fname))
144
    if moduled:
145
        candidates.append(join(moduled, fname))
146
    return candidates
147
148
def list_all_resources(executable : str, ocrd_tool : Optional[Dict[str, Any]] = None, moduled : Optional[str] = None, xdg_data_home : Optional[str] = None) -> List[str]:
149
    """
150
    List all processor resources in the filesystem according to
151
    https://ocr-d.de/en/spec/ocrd_tool#resource-parameters
152
    """
153
    xdg_data_home = xdg_data_home or config.XDG_DATA_HOME
154
    if ocrd_tool is None:
155
        ocrd_tool = get_ocrd_tool_json(executable)
156
    # processor we're looking for might not be installed, hence the fallbacks
157
    try:
158
        mimetypes = get_processor_resource_types(executable, ocrd_tool=ocrd_tool)
159
    except KeyError:
160
        mimetypes = ['*/*']
161
    try:
162
        resource_locations = ocrd_tool['resource_locations']
163
    except KeyError:
164
        # Assume the default
165
        resource_locations = RESOURCE_LOCATIONS
166
    try:
167
        # fixme: if resources_list contains directories, their "suffix" will interfere
168
        # (e.g. dirname without dot means we falsely match files without suffix)
169
        resource_suffixes = [Path(res['name']).suffix
170
                             for res in ocrd_tool['resources']]
171
    except KeyError:
172
        resource_suffixes = []
173
    logger = getLogger('ocrd.utils.list_all_resources')
174
    candidates = []
175
    # cwd would list too many false positives:
176
    # if 'cwd' in resource_locations:
177
    #     cwddir = Path.cwd()
178
    #     candidates.append(cwddir.itertree())
179
    # but we do not use this anyway:
180
    # relative paths are tried w.r.t. CWD
181
    # prior to list_all_resources resolution.
182
    for processor_path in get_env_locations(executable):
183
        processor_path = Path(processor_path)
184
        if processor_path.is_dir():
185
            candidates += processor_path.iterdir()
186
    if 'data' in resource_locations:
187
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
188
        if datadir.is_dir():
189
            candidates += datadir.iterdir()
190
    if 'system' in resource_locations:
191
        systemdir = Path(RESOURCES_DIR_SYSTEM, executable)
192
        if systemdir.is_dir():
193
            candidates += systemdir.iterdir()
194
    if 'module' in resource_locations and moduled:
195
        # recurse fully
196
        moduled = Path(moduled)
197
        for resource in moduled.iterdir():
198
            if resource.is_dir():
199
                continue
200
            if any(resource.match(pattern) for pattern in
201
                   # Python distributions do not distinguish between
202
                   # code and data; `is_resource()` only singles out
203
                   # files over directories; but we want data files only
204
                   # todo: more code and cache exclusion patterns!
205
                   ['*.py', '*.py[cod]', '*~', '.*.swp', '*.swo',
206
                    '__pycache__/*', '*.egg-info/*', '*.egg',
207
                    'copyright.txt', 'LICENSE*', 'README.md', 'MANIFEST',
208
                    'TAGS', '.DS_Store',
209
                    # C extensions
210
                    '*.so',
211
                    # translations
212
                    '*.mo', '*.pot',
213
                    '*.log', '*.orig', '*.BAK',
214
                    '.git/*',
215
                    # our stuff
216
                    'ocrd-tool.json',
217
                    'environment.pickle', 'resource_list.yml']):
218
                logger.debug("ignoring module candidate '%s'", resource)
219
                continue
220
            candidates.append(resource)
221
    if mimetypes != ['*/*']:
222
        logger.debug("matching candidates for %s by content-type %s", executable, str(mimetypes))
223
    def valid_resource_type(path):
224
        if '*/*' in mimetypes:
225
            return True
226
        if path.is_dir():
227
            if not 'text/directory' in mimetypes:
228
                logger.debug("ignoring directory candidate '%s'", path)
229
                return False
230
            if path.name in ['.git']:
231
                logger.debug("ignoring directory candidate '%s'", path)
232
                return False
233
            return True
234
        if not path.is_file():
235
            logger.warning("ignoring non-file, non-directory candidate '%s'", path)
236
            return False
237
        res_mimetype = guess_media_type(path, fallback='')
238
        if res_mimetype == 'application/json':
239
            # always accept, regardless of configured mimetypes:
240
            # needed for distributing or sharing parameter preset files
241
            return True
242
        if ['text/directory'] == mimetypes:
243
            logger.debug("ignoring non-directory candidate '%s'", path)
244
            return False
245
        if 'application/octet-stream' in mimetypes:
246
            # catch-all type - do not enforce anything
247
            return True
248
        if path.suffix in resource_suffixes:
249
            return True
250
        if any(path.suffix == MIME_TO_EXT.get(mime, None)
251
               for mime in mimetypes):
252
            return True
253
        if not res_mimetype:
254
            logger.warning("cannot determine content type of candidate '%s'", path)
255
            return True
256
        if any(apply_glob([res_mimetype], mime)
257
               for mime in mimetypes):
258
            return True
259
        logger.debug("ignoring %s candidate '%s'", res_mimetype, path)
260
        return False
261
    candidates = sorted(filter(valid_resource_type, candidates))
262
    return map(str, candidates)
263
264
def get_processor_resource_types(executable : str, ocrd_tool : Optional[Dict[str, Any]] = None) -> List[str]:
265
    """
266
    Determine what type of resource parameters a processor needs.
267
268
    Return a list of MIME types (with the special value `*/*` to
269
    designate that arbitrary files or directories are allowed).
270
    """
271
    if not ocrd_tool:
272
        # if the processor in question is not installed, assume both files and directories
273
        if not which(executable):
274
            return ['*/*']
275
        ocrd_tool = get_ocrd_tool_json(executable)
276
    mime_types = [mime
277
                  for param in ocrd_tool.get('parameters', {}).values()
278
                  if param['type'] == 'string' and param.get('format', '') == 'uri' and 'content-type' in param
279
                  for mime in param['content-type'].split(',')]
280
    if not len(mime_types):
281
        # None of the parameters for this processor are resources
282
        # (or the parameters' resource types are not properly declared,)
283
        # so output both directories and files
284
        return ['*/*']
285
    return mime_types
286
287
288
# ht @pabs3
289
# https://github.com/untitaker/python-atomicwrites/issues/42
290
class AtomicWriterPerms(AtomicWriter):
291
    def get_fileobject(self, **kwargs):
292
        f = super().get_fileobject(**kwargs)
293
        try:
294
            mode = stat(self._path).st_mode
295
        except FileNotFoundError:
296
            # Creating a new file, emulate what os.open() does
297
            mask = umask(0)
298
            umask(mask)
299
            mode = 0o664 & ~mask
300
        fd = f.fileno()
301
        chmod(fd, mode)
302
        return f
303
304
305
@contextmanager
306
def atomic_write(fpath : str) -> Iterator[str]:
307
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
308
        yield f
309
310
311
def is_file_in_directory(directory : Union[str, PathLike], file : Union[str, PathLike]) -> bool:
312
    """
313
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
314
    """
315
    directory = Path(directory)
316
    file = Path(file)
317
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
318
319
def itertree(path : Union[str, PathLike]) -> PathLike:
320
    """
321
    Generate a list of paths by recursively enumerating ``path``
322
    """
323
    if not isinstance(path, Path):
324
        path = Path(path)
325
    if path.is_dir():
326
        for subpath in path.iterdir():
327
            yield from itertree(subpath)
328
    yield path
329
330
def directory_size(path : Union[str, PathLike]) -> int:
331
    """
332
    Calculates size of all files in directory ``path``
333
    """
334
    path = Path(path)
335
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
336
337
def guess_media_type(input_file : str, fallback : Optional[str] = None, application_xml : str = 'application/xml') -> str:
338
    """
339
    Guess the media type of a file path
340
    """
341
    mimetype = filetype_guess(input_file)
342
    if mimetype is not None:
343
        mimetype = mimetype.mime
344
    else:
345
        mimetype = mimetypes_guess(input_file)[0]
346
    if mimetype is None:
347
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
348
    if mimetype is None:
349
        raise ValueError("Could not determine MIME type of input_file '%s'", str(input_file))
350
    if mimetype == 'application/xml':
351
        mimetype = application_xml
352
    return mimetype
353
354
355
@contextmanager
356
def redirect_stderr_and_stdout_to_file(filename):
357
    with open(filename, 'at', encoding='utf-8') as f:
358
        with redirect_stderr(f), redirect_stdout(f):
359
            yield
360