Passed
Push — master ( ceb999...3df4a1 )
by Konstantin
02:15
created

ocrd_utils.os.get_moduledir()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'pushd_popd',
12
    'unzip_file_to_dir',
13
    'atomic_write',
14
]
15
16
from tempfile import TemporaryDirectory, gettempdir
17
from functools import lru_cache
18
import contextlib
19
from distutils.spawn import find_executable as which
20
from json import loads
21
from json.decoder import JSONDecodeError
22
from os import getcwd, chdir, stat, chmod, umask, environ
23
from pathlib import Path
24
from os.path import exists, abspath as abspath_, join, isdir
25
from zipfile import ZipFile
26
from subprocess import run, PIPE
27
28
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
29
30
from .constants import XDG_DATA_HOME
31
from .logging import getLogger
32
33
def abspath(url):
34
    """
35
    Get a full path to a file or file URL
36
37
    See os.abspath
38
    """
39
    if url.startswith('file://'):
40
        url = url[len('file://'):]
41
    return abspath_(url)
42
43
@contextlib.contextmanager
44
def pushd_popd(newcwd=None, tempdir=False):
45
    if newcwd and tempdir:
46
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
47
    try:
48
        oldcwd = getcwd()
49
    except FileNotFoundError:
50
        # This happens when a directory is deleted before the context is exited
51
        oldcwd = gettempdir()
52
    try:
53
        if tempdir:
54
            with TemporaryDirectory() as tempcwd:
55
                chdir(tempcwd)
56
                yield Path(tempcwd).resolve()
57
        else:
58
            if newcwd:
59
                chdir(newcwd)
60
            yield Path(newcwd).resolve()
61
    finally:
62
        chdir(oldcwd)
63
64
def unzip_file_to_dir(path_to_zip, output_directory):
65
    """
66
    Extract a ZIP archive to a directory
67
    """
68
    z = ZipFile(path_to_zip, 'r')
69
    z.extractall(output_directory)
70
    z.close()
71
72
@lru_cache()
73
def get_ocrd_tool_json(executable):
74
    """
75
    Get the ``ocrd-tool`` description of ``executable``.
76
    """
77
    executable_name = Path(executable).name
78
    try:
79
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
80
    except (JSONDecodeError, OSError) as e:
81
        getLogger('ocrd_utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
82
        ocrd_tool = {}
83
    if 'resource_locations' not in ocrd_tool:
84
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
85
    return ocrd_tool
86
87
@lru_cache()
88
def get_moduledir(executable):
89
    moduledir = None
90
    try:
91
        moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE).stdout.rstrip('\n')
92
    except (JSONDecodeError, OSError) as e:
93
        getLogger('ocrd_utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
94
    return moduledir
95
96
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
97
    """
98
    Generate candidates for processor resources according to
99
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
100
    """
101
    candidates = []
102
    candidates.append(join(cwd, fname))
103
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
104
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
105
    if processor_path_var in environ:
106
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
107
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
108
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
109
    if moduled:
110
        candidates.append(join(moduled, fname))
111
    return candidates
112
113
def list_all_resources(executable, moduled=None, xdg_data_home=None):
114
    """
115
    List all processor resources in the filesystem according to
116
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
117
    """
118
    candidates = []
119
    try:
120
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
121
    except FileNotFoundError:
122
        # processor we're looking for ressource_locations of is not installed.
123
        # Assume the default
124
        resource_locations = ['data', 'cwd', 'system', 'module']
125
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
126
    # XXX cwd would list too many false positives
127
    # if 'cwd' in resource_locations:
128
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
129
    #     if Path(cwd_candidate).exists():
130
    #         candidates.append(cwd_candidate)
131
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
132
    if processor_path_var in environ:
133
        for processor_path in environ[processor_path_var].split(':'):
134
            if Path(processor_path).is_dir():
135
                candidates += Path(processor_path).iterdir()
136
    if 'data' in resource_locations:
137
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
138
        if datadir.is_dir():
139
            candidates += datadir.iterdir()
140
    if 'system' in resource_locations:
141
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
142
        if systemdir.is_dir():
143
            candidates += systemdir.iterdir()
144
    if 'module' in resource_locations and moduled:
145
        # recurse fully
146
        for resource in itertree(Path(moduled)):
147
            if resource.is_dir():
148
                continue
149
            if any(resource.match(pattern) for pattern in
150
                   # Python distributions do not distinguish between
151
                   # code and data; `is_resource()` only singles out
152
                   # files over directories; but we want data files only
153
                   # todo: more code and cache exclusion patterns!
154
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
155
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
156
                continue
157
            candidates.append(resource)
158
    # recurse once
159
    for parent in candidates:
160
        if parent.is_dir() and parent.name != '.git':
161
            candidates += parent.iterdir()
162
    return sorted([str(x) for x in candidates])
163
164
def get_processor_resource_types(executable, ocrd_tool=None):
165
    """
166
    Determine what type of resource parameters a processor needs.
167
168
    Return a list of MIME types (with the special value `*/*` to
169
    designate that arbitrary files or directories are allowed).
170
    """
171
    if not ocrd_tool:
172
        # if the processor in question is not installed, assume both files and directories
173
        if not which(executable):
174
            return ['*/*']
175
        ocrd_tool = get_ocrd_tool_json(executable)
176
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
177
        # None of the parameters for this processor are resources (or not
178
        # the resource parametrs are not properly declared, so output both
179
        # directories and files
180
        return ['*/*']
181
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
182
            if 'content-type' in p]
183
184
# ht @pabs3
185
# https://github.com/untitaker/python-atomicwrites/issues/42
186
class AtomicWriterPerms(AtomicWriter):
187
    def get_fileobject(self, **kwargs):
188
        f = super().get_fileobject(**kwargs)
189
        try:
190
            mode = stat(self._path).st_mode
191
        except FileNotFoundError:
192
            # Creating a new file, emulate what os.open() does
193
            mask = umask(0)
194
            umask(mask)
195
            mode = 0o664 & ~mask
196
        fd = f.fileno()
197
        chmod(fd, mode)
198
        return f
199
200
@contextlib.contextmanager
201
def atomic_write(fpath):
202
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
203
        yield f
204
205
206
def is_file_in_directory(directory, file):
207
    """
208
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
209
    """
210
    directory = Path(directory)
211
    file = Path(file)
212
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
213
214
def itertree(path):
215
    """
216
    Generate a list of paths by recursively enumerating ``path``
217
    """
218
    if not isinstance(path, Path):
219
        path = Path(path)
220
    if path.is_dir():
221
        for subpath in path.iterdir():
222
            yield from itertree(subpath)
223
    yield path
224
225
def directory_size(path):
226
    """
227
    Calculcates size of all files in directory ``path``
228
    """
229
    path = Path(path)
230
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
231