Passed
Push — master ( 9bf9cc...799459 )
by Konstantin
04:18 queued 01:14
created

ocrd_utils.os   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 56
eloc 168
dl 0
loc 251
rs 5.5199
c 0
b 0
f 0

13 Functions

Rating   Name   Duplication   Size   Complexity  
A itertree() 0 10 4
A list_resource_candidates() 0 16 4
B pushd_popd() 0 20 7
A get_ocrd_tool_json() 0 14 3
A get_processor_resource_types() 0 19 4
A unzip_file_to_dir() 0 7 1
A atomic_write() 0 4 2
A abspath() 0 9 2
F list_all_resources() 0 50 18
A guess_media_type() 0 16 5
A is_file_in_directory() 0 7 1
A get_moduledir() 0 8 2
A directory_size() 0 6 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A AtomicWriterPerms.get_fileobject() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like ocrd_utils.os often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'directory_size',
7
    'is_file_in_directory',
8
    'get_ocrd_tool_json',
9
    'get_moduledir',
10
    'get_processor_resource_types',
11
    'guess_media_type',
12
    'pushd_popd',
13
    'unzip_file_to_dir',
14
    'atomic_write',
15
]
16
17
from tempfile import TemporaryDirectory, gettempdir
18
from functools import lru_cache
19
import contextlib
20
from distutils.spawn import find_executable as which
21
from json import loads
22
from json.decoder import JSONDecodeError
23
from os import getcwd, chdir, stat, chmod, umask, environ
24
from pathlib import Path
25
from os.path import exists, abspath as abspath_, join, isdir
26
from zipfile import ZipFile
27
from subprocess import run, PIPE
28
from mimetypes import guess_type as mimetypes_guess
29
from filetype import guess as filetype_guess
30
31
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
32
33
from .constants import XDG_DATA_HOME, EXT_TO_MIME
34
from .logging import getLogger
35
36
def abspath(url):
37
    """
38
    Get a full path to a file or file URL
39
40
    See os.abspath
41
    """
42
    if url.startswith('file://'):
43
        url = url[len('file://'):]
44
    return abspath_(url)
45
46
@contextlib.contextmanager
47
def pushd_popd(newcwd=None, tempdir=False):
48
    if newcwd and tempdir:
49
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
50
    try:
51
        oldcwd = getcwd()
52
    except FileNotFoundError:
53
        # This happens when a directory is deleted before the context is exited
54
        oldcwd = gettempdir()
55
    try:
56
        if tempdir:
57
            with TemporaryDirectory() as tempcwd:
58
                chdir(tempcwd)
59
                yield Path(tempcwd).resolve()
60
        else:
61
            if newcwd:
62
                chdir(newcwd)
63
            yield Path(newcwd).resolve()
64
    finally:
65
        chdir(oldcwd)
66
67
def unzip_file_to_dir(path_to_zip, output_directory):
68
    """
69
    Extract a ZIP archive to a directory
70
    """
71
    z = ZipFile(path_to_zip, 'r')
72
    z.extractall(output_directory)
73
    z.close()
74
75
@lru_cache()
76
def get_ocrd_tool_json(executable):
77
    """
78
    Get the ``ocrd-tool`` description of ``executable``.
79
    """
80
    executable_name = Path(executable).name
81
    try:
82
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
83
    except (JSONDecodeError, OSError) as e:
84
        getLogger('ocrd_utils.get_ocrd_tool_json').error(f'{executable} --dump-json produced invalid JSON: {e}')
85
        ocrd_tool = {}
86
    if 'resource_locations' not in ocrd_tool:
87
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
88
    return ocrd_tool
89
90
@lru_cache()
91
def get_moduledir(executable):
92
    moduledir = None
93
    try:
94
        moduledir = run([executable, '--dump-module-dir'], encoding='utf-8', stdout=PIPE).stdout.rstrip('\n')
95
    except (JSONDecodeError, OSError) as e:
96
        getLogger('ocrd_utils.get_moduledir').error(f'{executable} --dump-module-dir failed: {e}')
97
    return moduledir
98
99
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
100
    """
101
    Generate candidates for processor resources according to
102
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
103
    """
104
    candidates = []
105
    candidates.append(join(cwd, fname))
106
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
107
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
108
    if processor_path_var in environ:
109
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
110
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
111
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
112
    if moduled:
113
        candidates.append(join(moduled, fname))
114
    return candidates
115
116
def list_all_resources(executable, moduled=None, xdg_data_home=None):
117
    """
118
    List all processor resources in the filesystem according to
119
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
120
    """
121
    candidates = []
122
    try:
123
        resource_locations = get_ocrd_tool_json(executable)['resource_locations']
124
    except FileNotFoundError:
125
        # processor we're looking for ressource_locations of is not installed.
126
        # Assume the default
127
        resource_locations = ['data', 'cwd', 'system', 'module']
128
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
129
    # XXX cwd would list too many false positives
130
    # if 'cwd' in resource_locations:
131
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
132
    #     if Path(cwd_candidate).exists():
133
    #         candidates.append(cwd_candidate)
134
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
135
    if processor_path_var in environ:
136
        for processor_path in environ[processor_path_var].split(':'):
137
            if Path(processor_path).is_dir():
138
                candidates += Path(processor_path).iterdir()
139
    if 'data' in resource_locations:
140
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
141
        if datadir.is_dir():
142
            candidates += datadir.iterdir()
143
    if 'system' in resource_locations:
144
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
145
        if systemdir.is_dir():
146
            candidates += systemdir.iterdir()
147
    if 'module' in resource_locations and moduled:
148
        # recurse fully
149
        for resource in itertree(Path(moduled)):
150
            if resource.is_dir():
151
                continue
152
            if any(resource.match(pattern) for pattern in
153
                   # Python distributions do not distinguish between
154
                   # code and data; `is_resource()` only singles out
155
                   # files over directories; but we want data files only
156
                   # todo: more code and cache exclusion patterns!
157
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json', 
158
                    'environment.pickle', 'resource_list.yml', 'lib.bash']):
159
                continue
160
            candidates.append(resource)
161
    # recurse once
162
    for parent in candidates:
163
        if parent.is_dir() and parent.name != '.git':
164
            candidates += parent.iterdir()
165
    return sorted([str(x) for x in candidates])
166
167
def get_processor_resource_types(executable, ocrd_tool=None):
168
    """
169
    Determine what type of resource parameters a processor needs.
170
171
    Return a list of MIME types (with the special value `*/*` to
172
    designate that arbitrary files or directories are allowed).
173
    """
174
    if not ocrd_tool:
175
        # if the processor in question is not installed, assume both files and directories
176
        if not which(executable):
177
            return ['*/*']
178
        ocrd_tool = get_ocrd_tool_json(executable)
179
    if not next((True for p in ocrd_tool.get('parameters', {}).values() if 'content-type' in p), False):
180
        # None of the parameters for this processor are resources (or not
181
        # the resource parametrs are not properly declared, so output both
182
        # directories and files
183
        return ['*/*']
184
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
185
            if 'content-type' in p]
186
187
# ht @pabs3
188
# https://github.com/untitaker/python-atomicwrites/issues/42
189
class AtomicWriterPerms(AtomicWriter):
190
    def get_fileobject(self, **kwargs):
191
        f = super().get_fileobject(**kwargs)
192
        try:
193
            mode = stat(self._path).st_mode
194
        except FileNotFoundError:
195
            # Creating a new file, emulate what os.open() does
196
            mask = umask(0)
197
            umask(mask)
198
            mode = 0o664 & ~mask
199
        fd = f.fileno()
200
        chmod(fd, mode)
201
        return f
202
203
@contextlib.contextmanager
204
def atomic_write(fpath):
205
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
206
        yield f
207
208
209
def is_file_in_directory(directory, file):
210
    """
211
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
212
    """
213
    directory = Path(directory)
214
    file = Path(file)
215
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
216
217
def itertree(path):
218
    """
219
    Generate a list of paths by recursively enumerating ``path``
220
    """
221
    if not isinstance(path, Path):
222
        path = Path(path)
223
    if path.is_dir():
224
        for subpath in path.iterdir():
225
            yield from itertree(subpath)
226
    yield path
227
228
def directory_size(path):
229
    """
230
    Calculcates size of all files in directory ``path``
231
    """
232
    path = Path(path)
233
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
234
235
def guess_media_type(input_file : str, fallback : str = None, application_xml : str = 'application/xml'):
236
    """
237
    Guess the media type of a file path
238
    """
239
    mimetype = filetype_guess(input_file)
240
    if mimetype is not None:
241
        mimetype = mimetype.mime
242
    else:
243
        mimetype = mimetypes_guess(input_file)[0]
244
    if mimetype is None:
245
        mimetype = EXT_TO_MIME.get(''.join(Path(input_file).suffixes), fallback)
246
    if mimetype is None:
247
        raise ValueError("Could not determine MIME type of input_file must")
248
    if mimetype == 'application/xml':
249
        mimetype = application_xml
250
    return mimetype
251