Passed
Pull Request — master (#800)
by Konstantin
02:22
created

ocrd_utils.os.itertree()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 4
nop 1
1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_processor_resource_types',
10
    'pushd_popd',
11
    'unzip_file_to_dir',
12
    'atomic_write',
13
]
14
15
from tempfile import TemporaryDirectory
16
from functools import lru_cache
17
import contextlib
18
from distutils.spawn import find_executable as which
19
from json import loads
20
from json.decoder import JSONDecodeError
21
from os import getcwd, chdir, stat, chmod, umask, environ
22
from pathlib import Path
23
from os.path import exists, abspath as abspath_, join, isdir
24
from zipfile import ZipFile
25
from subprocess import run, PIPE
26
27
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
28
29
from .constants import XDG_DATA_HOME
30
from .logging import getLogger
31
32
def abspath(url):
33
    """
34
    Get a full path to a file or file URL
35
36
    See os.abspath
37
    """
38
    if url.startswith('file://'):
39
        url = url[len('file://'):]
40
    return abspath_(url)
41
42
@contextlib.contextmanager
43
def pushd_popd(newcwd=None, tempdir=False):
44
    if newcwd and tempdir:
45
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
46
    try:
47
        oldcwd = getcwd()
48
    except FileNotFoundError:
49
        # This happens when a directory is deleted before the context is exited
50
        oldcwd = '/tmp'
51
    try:
52
        if tempdir:
53
            with TemporaryDirectory() as tempcwd:
54
                chdir(tempcwd)
55
                yield tempcwd
56
        else:
57
            if newcwd:
58
                chdir(newcwd)
59
            yield newcwd
60
    finally:
61
        chdir(oldcwd)
62
63
def unzip_file_to_dir(path_to_zip, output_directory):
64
    """
65
    Extract a ZIP archive to a directory
66
    """
67
    z = ZipFile(path_to_zip, 'r')
68
    z.extractall(output_directory)
69
    z.close()
70
71
@lru_cache()
72
def get_ocrd_tool_json(executable):
73
    """
74
    Get the ``ocrd-tool`` description of ``executable``.
75
    """
76
    executable_name = Path(executable).name
77
    try:
78
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
79
    except (JSONDecodeError, OSError) as e:
80
        getLogger('ocrd_utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
81
        ocrd_tool = {}
82
    if 'resource_locations' not in ocrd_tool:
83
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
84
    return ocrd_tool
85
86
# def get_processor_resources_list(executable):
87
#     """
88
#     Get the list of resources that a processor is providing via
89
#     ``-list-resources``
90
#     """
91
92
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
93
    """
94
    Generate candidates for processor resources according to
95
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
96
    """
97
    candidates = []
98
    candidates.append(join(cwd, fname))
99
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
100
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
101
    if processor_path_var in environ:
102
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
103
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
104
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
105
    if moduled:
106
        candidates.append(join(moduled, fname))
107
    return candidates
108
109
def list_all_resources(executable, moduled=None, xdg_data_home=None):
110
    """
111
    List all processor resources in the filesystem according to
112
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
113
    """
114
    candidates = []
115
    try:
116
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
117
    except FileNotFoundError:
118
        # processor we're looking for ressource_locations of is not installed.
119
        # Assume the default
120
        resource_locations = ['data', 'cwd', 'system', 'module']
121
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
122
    # XXX cwd would list too many false positives
123
    # if 'cwd' in resource_locations:
124
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
125
    #     if Path(cwd_candidate).exists():
126
    #         candidates.append(cwd_candidate)
127
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
128
    if processor_path_var in environ:
129
        for processor_path in environ[processor_path_var].split(':'):
130
            if Path(processor_path).is_dir():
131
                candidates += Path(processor_path).iterdir()
132
    if 'data' in resource_locations:
133
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
134
        if datadir.is_dir():
135
            candidates += datadir.iterdir()
136
    if 'system' in resource_locations:
137
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
138
        if systemdir.is_dir():
139
            candidates += systemdir.iterdir()
140
    if 'module' in resource_locations and moduled:
141
        # recurse fully
142
        for resource in itertree(Path(moduled)):
143
            if resource.is_dir():
144
                continue
145
            if any(resource.match(pattern) for pattern in
146
                   # Python distributions do not distinguish between
147
                   # code and data; `is_resource()` only singles out
148
                   # files over directories; but we want data files only
149
                   # todo: more code and cache exclusion patterns!
150
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
151
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
152
                continue
153
            candidates.append(resource)
154
    # recurse once
155
    for parent in candidates:
156
        if parent.is_dir() and parent.name != '.git':
157
            candidates += parent.iterdir()
158
    return sorted([str(x) for x in candidates])
159
160
def get_processor_resource_types(executable, ocrd_tool=None):
161
    """
162
    Determine what type of resource parameters a processor needs.
163
164
    Return a list of MIME types (with the special value `*/*` to
165
    designate that arbitrary files or directories are allowed).
166
    """
167
    if not ocrd_tool:
168
        # if the processor in question is not installed, assume both files and directories
169
        if not which(executable):
170
            return ['*/*']
171
    ocrd_tool = get_ocrd_tool_json(executable)
172
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
173
        # None of the parameters for this processor are resources (or not
174
        # the resource parametrs are not properly declared, so output both
175
        # directories and files
176
        return ['*/*']
177
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
178
            if 'content-type' in p]
179
180
# ht @pabs3
181
# https://github.com/untitaker/python-atomicwrites/issues/42
182
class AtomicWriterPerms(AtomicWriter):
183
    def get_fileobject(self, **kwargs):
184
        f = super().get_fileobject(**kwargs)
185
        try:
186
            mode = stat(self._path).st_mode
187
        except FileNotFoundError:
188
            # Creating a new file, emulate what os.open() does
189
            mask = umask(0)
190
            umask(mask)
191
            mode = 0o664 & ~mask
192
        fd = f.fileno()
193
        chmod(fd, mode)
194
        return f
195
196
@contextlib.contextmanager
197
def atomic_write(fpath):
198
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
199
        yield f
200
201
202
def is_file_in_directory(directory, file):
203
    """
204
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
205
    """
206
    directory = Path(directory)
207
    file = Path(file)
208
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
209
210
def itertree(path):
211
    """
212
    Generate a list of paths by recursively enumerating ``path``
213
    """
214
    if not isinstance(path, Path):
215
        path = Path(path)
216
    if path.is_dir():
217
        for subpath in path.iterdir():
218
            yield from itertree(subpath)
219
    yield path
220
221
def directory_size(path):
222
    """
223
    Calculcates size of all files in directory ``path``
224
    """
225
    path = Path(path)
226
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
227