Passed
Pull Request — master (#1258)
by Konstantin
02:35
created

ocrd_utils.os   F

Complexity

Total Complexity 61

Size/Duplication

Total Lines 268
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 61
eloc 184
dl 0
loc 268
rs 3.52
c 0
b 0
f 0

14 Functions

Rating   Name   Duplication   Size   Complexity  
A itertree() 0 10 4
A list_resource_candidates() 0 16 4
B pushd_popd() 0 20 7
A get_ocrd_tool_json() 0 18 4
A get_processor_resource_types() 0 19 4
A unzip_file_to_dir() 0 7 1
A atomic_write() 0 4 2
A abspath() 0 9 2
F list_all_resources() 0 50 18
A guess_media_type() 0 16 5
A is_file_in_directory() 0 7 1
A redirect_stderr_and_stdout_to_file() 0 5 3
A get_moduledir() 0 12 3
A directory_size() 0 6 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A AtomicWriterPerms.get_fileobject() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like ocrd_utils.os often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'guess_media_type',
12
    'pushd_popd',
13
    'unzip_file_to_dir',
14
    'atomic_write',
15
    'redirect_stderr_and_stdout_to_file',
16
]
17
18
from tempfile import TemporaryDirectory, gettempdir
19
from functools import lru_cache
20
from contextlib import contextmanager, redirect_stderr, redirect_stdout
21
from shutil import which
22
from json import loads
23
from json.decoder import JSONDecodeError
24
from os import getcwd, chdir, stat, chmod, umask, environ
25
from pathlib import Path
26
from os.path import abspath as abspath_, join
27
from zipfile import ZipFile
28
from subprocess import run, PIPE
29
from mimetypes import guess_type as mimetypes_guess
30
from filetype import guess as filetype_guess
31
32
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
33
34
from .constants import EXT_TO_MIME
35
from .config import config
36
from .logging import getLogger
37
from .introspect import resource_string
38
39
def abspath(url):
40
    """
41
    Get a full path to a file or file URL
42
43
    See os.abspath
44
    """
45
    if url.startswith('file://'):
46
        url = url[len('file://'):]
47
    return abspath_(url)
48
49
@contextmanager
50
def pushd_popd(newcwd=None, tempdir=False):
51
    if newcwd and tempdir:
52
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
53
    try:
54
        oldcwd = getcwd()
55
    except FileNotFoundError:
56
        # This happens when a directory is deleted before the context is exited
57
        oldcwd = gettempdir()
58
    try:
59
        if tempdir:
60
            with TemporaryDirectory() as tempcwd:
61
                chdir(tempcwd)
62
                yield Path(tempcwd).resolve()
63
        else:
64
            if newcwd:
65
                chdir(newcwd)
66
            yield Path(newcwd).resolve()
67
    finally:
68
        chdir(oldcwd)
69
70
def unzip_file_to_dir(path_to_zip, output_directory):
71
    """
72
    Extract a ZIP archive to a directory
73
    """
74
    z = ZipFile(path_to_zip, 'r')
75
    z.extractall(output_directory)
76
    z.close()
77
78
@lru_cache()
79
def get_ocrd_tool_json(executable):
80
    """
81
    Get the ``ocrd-tool`` description of ``executable``.
82
    """
83
    ocrd_tool = {}
84
    executable_name = Path(executable).name
85
    try:
86
        ocrd_all_tool = loads(resource_string('ocrd', 'ocrd-all-tool.json'))
87
        ocrd_tool = ocrd_all_tool[executable]
88
    except (JSONDecodeError, OSError, KeyError):
89
        try:
90
            ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
91
        except (JSONDecodeError, OSError) as e:
92
            getLogger('ocrd.utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
93
    if 'resource_locations' not in ocrd_tool:
94
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
95
    return ocrd_tool
96
97
@lru_cache()
98
def get_moduledir(executable):
99
    moduledir = None
100
    try:
101
        ocrd_all_moduledir = loads(resource_string('ocrd', 'ocrd-all-module-dir.json'))
102
        moduledir = ocrd_all_moduledir[executable]
103
    except (JSONDecodeError, OSError, KeyError):
104
        try:
105
            moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE).stdout.rstrip('\n')
106
        except (JSONDecodeError, OSError) as e:
107
            getLogger('ocrd.utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
108
    return moduledir
109
110
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
111
    """
112
    Generate candidates for processor resources according to
113
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
114
    """
115
    candidates = []
116
    candidates.append(join(cwd, fname))
117
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
118
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
119
    if processor_path_var in environ:
120
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
121
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
122
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
123
    if moduled:
124
        candidates.append(join(moduled, fname))
125
    return candidates
126
127
def list_all_resources(executable, moduled=None, xdg_data_home=None):
128
    """
129
    List all processor resources in the filesystem according to
130
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
131
    """
132
    candidates = []
133
    try:
134
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
135
    except FileNotFoundError:
136
        # processor we're looking for resource_locations of is not installed.
137
        # Assume the default
138
        resource_locations = ['data', 'cwd', 'system', 'module']
139
    xdg_data_home = config.XDG_DATA_HOME if not xdg_data_home else xdg_data_home
140
    # XXX cwd would list too many false positives
141
    # if 'cwd' in resource_locations:
142
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
143
    #     if Path(cwd_candidate).exists():
144
    #         candidates.append(cwd_candidate)
145
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
146
    if processor_path_var in environ:
147
        for processor_path in environ[processor_path_var].split(':'):
148
            if Path(processor_path).is_dir():
149
                candidates += Path(processor_path).iterdir()
150
    if 'data' in resource_locations:
151
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
152
        if datadir.is_dir():
153
            candidates += datadir.iterdir()
154
    if 'system' in resource_locations:
155
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
156
        if systemdir.is_dir():
157
            candidates += systemdir.iterdir()
158
    if 'module' in resource_locations and moduled:
159
        # recurse fully
160
        for resource in itertree(Path(moduled)):
161
            if resource.is_dir():
162
                continue
163
            if any(resource.match(pattern) for pattern in
164
                   # Python distributions do not distinguish between
165
                   # code and data; `is_resource()` only singles out
166
                   # files over directories; but we want data files only
167
                   # todo: more code and cache exclusion patterns!
168
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
169
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
170
                continue
171
            candidates.append(resource)
172
    # recurse once
173
    for parent in candidates:
174
        if parent.is_dir() and parent.name != '.git':
175
            candidates += parent.iterdir()
176
    return sorted([str(x) for x in candidates])
177
178
def get_processor_resource_types(executable, ocrd_tool=None):
179
    """
180
    Determine what type of resource parameters a processor needs.
181
182
    Return a list of MIME types (with the special value `*/*` to
183
    designate that arbitrary files or directories are allowed).
184
    """
185
    if not ocrd_tool:
186
        # if the processor in question is not installed, assume both files and directories
187
        if not which(executable):
188
            return ['*/*']
189
        ocrd_tool = get_ocrd_tool_json(executable)
190
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
191
        # None of the parameters for this processor are resources (or not
192
        # the resource parameters are not properly declared, so output both
193
        # directories and files
194
        return ['*/*']
195
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
196
            if 'content-type' in p]
197
198
# ht @pabs3
199
# https://github.com/untitaker/python-atomicwrites/issues/42
200
class AtomicWriterPerms(AtomicWriter):
201
    def get_fileobject(self, **kwargs):
202
        f = super().get_fileobject(**kwargs)
203
        try:
204
            mode = stat(self._path).st_mode
205
        except FileNotFoundError:
206
            # Creating a new file, emulate what os.open() does
207
            mask = umask(0)
208
            umask(mask)
209
            mode = 0o664 & ~mask
210
        fd = f.fileno()
211
        chmod(fd, mode)
212
        return f
213
214
@contextmanager
215
def atomic_write(fpath):
216
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
217
        yield f
218
219
220
def is_file_in_directory(directory, file):
221
    """
222
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
223
    """
224
    directory = Path(directory)
225
    file = Path(file)
226
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
227
228
def itertree(path):
229
    """
230
    Generate a list of paths by recursively enumerating ``path``
231
    """
232
    if not isinstance(path, Path):
233
        path = Path(path)
234
    if path.is_dir():
235
        for subpath in path.iterdir():
236
            yield from itertree(subpath)
237
    yield path
238
239
def directory_size(path):
240
    """
241
    Calculates size of all files in directory ``path``
242
    """
243
    path = Path(path)
244
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
245
246
def guess_media_type(input_file : str, fallback : str = None, application_xml : str = 'application/xml'):
247
    """
248
    Guess the media type of a file path
249
    """
250
    mimetype = filetype_guess(input_file)
251
    if mimetype is not None:
252
        mimetype = mimetype.mime
253
    else:
254
        mimetype = mimetypes_guess(input_file)[0]
255
    if mimetype is None:
256
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
257
    if mimetype is None:
258
        raise ValueError("Could not determine MIME type of input_file must")
259
    if mimetype == 'application/xml':
260
        mimetype = application_xml
261
    return mimetype
262
263
@contextmanager
264
def redirect_stderr_and_stdout_to_file(filename):
265
    with open(filename, 'at', encoding='utf-8') as f:
266
        with redirect_stderr(f), redirect_stdout(f):
267
            yield
268