Passed
Pull Request — master (#800)
by Konstantin
05:17
created

ocrd_utils.os   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 213
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 47
eloc 134
dl 0
loc 213
rs 8.64
c 0
b 0
f 0

10 Functions

Rating   Name   Duplication   Size   Complexity  
B pushd_popd() 0 20 7
A unzip_file_to_dir() 0 7 1
A abspath() 0 9 2
A itertree() 0 10 4
A list_resource_candidates() 0 16 4
A get_ocrd_tool_json() 0 15 3
A get_processor_resource_types() 0 19 4
A atomic_write() 0 4 2
F list_all_resources() 0 44 17
A is_file_in_directory() 0 7 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A AtomicWriterPerms.get_fileobject() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like ocrd_utils.os often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Operating system functions.
3
"""
4
__all__ = [
5
    'abspath',
6
    'is_file_in_directory',
7
    'get_ocrd_tool_json',
8
    'get_processor_resource_types',
9
    'pushd_popd',
10
    'unzip_file_to_dir',
11
    'atomic_write',
12
]
13
14
from tempfile import TemporaryDirectory
15
from functools import lru_cache
16
import contextlib
17
from distutils.spawn import find_executable as which
18
from json import loads
19
from json.decoder import JSONDecodeError
20
from os import getcwd, chdir, stat, chmod, umask, environ
21
from pathlib import Path
22
from os.path import exists, abspath as abspath_, join, isdir
23
from zipfile import ZipFile
24
from subprocess import run, PIPE
25
26
from atomicwrites import atomic_write as atomic_write_, AtomicWriter
27
28
from .constants import XDG_DATA_HOME
29
30
def abspath(url):
31
    """
32
    Get a full path to a file or file URL
33
34
    See os.abspath
35
    """
36
    if url.startswith('file://'):
37
        url = url[len('file://'):]
38
    return abspath_(url)
39
40
@contextlib.contextmanager
41
def pushd_popd(newcwd=None, tempdir=False):
42
    if newcwd and tempdir:
43
        raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
44
    try:
45
        oldcwd = getcwd()
46
    except FileNotFoundError:
47
        # This happens when a directory is deleted before the context is exited
48
        oldcwd = '/tmp'
49
    try:
50
        if tempdir:
51
            with TemporaryDirectory() as tempcwd:
52
                chdir(tempcwd)
53
                yield tempcwd
54
        else:
55
            if newcwd:
56
                chdir(newcwd)
57
            yield newcwd
58
    finally:
59
        chdir(oldcwd)
60
61
def unzip_file_to_dir(path_to_zip, output_directory):
62
    """
63
    Extract a ZIP archive to a directory
64
    """
65
    z = ZipFile(path_to_zip, 'r')
66
    z.extractall(output_directory)
67
    z.close()
68
69
@lru_cache()
70
def get_ocrd_tool_json(executable):
71
    """
72
    Get the ``ocrd-tool`` description of ``executable``.
73
    """
74
    executable_name = Path(executable).name
75
    try:
76
        ocrd_tool = loads(run([executable, '--dump-json'], stdout=PIPE).stdout)
77
    # except FileNotFoundError:
78
    #     ocrd_tool = {}
79
    except JSONDecodeError:
80
        ocrd_tool = {}
81
    if 'resource_locations' not in ocrd_tool:
82
        ocrd_tool['resource_locations'] = ['data', 'cwd', 'system', 'module']
83
    return ocrd_tool
84
85
# def get_processor_resources_list(executable):
86
#     """
87
#     Get the list of resources that a processor is providing via
88
#     ``-list-resources``
89
#     """
90
91
def list_resource_candidates(executable, fname, cwd=getcwd(), moduled=None, xdg_data_home=None):
92
    """
93
    Generate candidates for processor resources according to
94
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
95
    """
96
    candidates = []
97
    candidates.append(join(cwd, fname))
98
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
99
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
100
    if processor_path_var in environ:
101
        candidates += [join(x, fname) for x in environ[processor_path_var].split(':')]
102
    candidates.append(join(xdg_data_home, 'ocrd-resources', executable, fname))
103
    candidates.append(join('/usr/local/share/ocrd-resources', executable, fname))
104
    if moduled:
105
        candidates.append(join(moduled, fname))
106
    return candidates
107
108
def list_all_resources(executable, moduled=None, xdg_data_home=None):
109
    """
110
    List all processor resources in the filesystem according to
111
    https://ocr-d.de/en/spec/ocrd_tool#file-parameters
112
    """
113
    candidates = []
114
    resource_locations = get_ocrd_tool_json(executable)['resource_locations']
115
    xdg_data_home = XDG_DATA_HOME if not xdg_data_home else xdg_data_home
116
    # XXX cwd would list too many false positives
117
    # if 'cwd' in resource_locations:
118
    #     cwd_candidate = join(getcwd(), 'ocrd-resources', executable)
119
    #     if Path(cwd_candidate).exists():
120
    #         candidates.append(cwd_candidate)
121
    processor_path_var = '%s_PATH' % executable.replace('-', '_').upper()
122
    if processor_path_var in environ:
123
        for processor_path in environ[processor_path_var].split(':'):
124
            if Path(processor_path).is_dir():
125
                candidates += Path(processor_path).iterdir()
126
    if 'data' in resource_locations:
127
        datadir = Path(xdg_data_home, 'ocrd-resources', executable)
128
        if datadir.is_dir():
129
            candidates += datadir.iterdir()
130
    if 'system' in resource_locations:
131
        systemdir = Path('/usr/local/share/ocrd-resources', executable)
132
        if systemdir.is_dir():
133
            candidates += systemdir.iterdir()
134
    if 'module' in resource_locations and moduled:
135
        # recurse fully
136
        for resource in itertree(Path(moduled)):
137
            if resource.is_dir():
138
                continue
139
            if any(resource.match(pattern) for pattern in
140
                   # Python distributions do not distinguish between
141
                   # code and data; `is_resource()` only singles out
142
                   # files over directories; but we want data files only
143
                   # todo: more code and cache exclusion patterns!
144
                   ['*.py', '*.py[cod]', '*~', 'ocrd-tool.json']):
145
                continue
146
            candidates.append(resource)
147
    # recurse once
148
    for parent in candidates:
149
        if parent.is_dir() and parent.name != '.git':
150
            candidates += parent.iterdir()
151
    return sorted([str(x) for x in candidates])
152
153
def get_processor_resource_types(executable, ocrd_tool=None):
154
    """
155
    Determine what type of resource parameters a processor needs.
156
157
    Return a list of MIME types (with the special value `*/*` to
158
    designate that arbitrary files or directories are allowed).
159
    """
160
    if not ocrd_tool:
161
        # if the processor in question is not installed, assume both files and directories
162
        if not which(executable):
163
            return ['*/*']
164
        ocrd_tool = get_ocrd_tool_json(executable)
165
    if not next((True for p in ocrd_tool['parameters'].values() if 'content-type' in p), False):
166
        # None of the parameters for this processor are resources (or not
167
        # the resource parametrs are not properly declared, so output both
168
        # directories and files
169
        return ['*/*']
170
    return [p['content-type'] for p in ocrd_tool['parameters'].values()
171
            if 'content-type' in p]
172
173
# ht @pabs3
174
# https://github.com/untitaker/python-atomicwrites/issues/42
175
class AtomicWriterPerms(AtomicWriter):
176
    def get_fileobject(self, **kwargs):
177
        f = super().get_fileobject(**kwargs)
178
        try:
179
            mode = stat(self._path).st_mode
180
        except FileNotFoundError:
181
            # Creating a new file, emulate what os.open() does
182
            mask = umask(0)
183
            umask(mask)
184
            mode = 0o664 & ~mask
185
        fd = f.fileno()
186
        chmod(fd, mode)
187
        return f
188
189
@contextlib.contextmanager
190
def atomic_write(fpath):
191
    with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
192
        yield f
193
194
195
def is_file_in_directory(directory, file):
196
    """
197
    Return True if ``file`` is in ``directory`` (by checking that all components of ``directory`` are in ``file.parts``)
198
    """
199
    directory = Path(directory)
200
    file = Path(file)
201
    return list(file.parts)[:len(directory.parts)] == list(directory.parts)
202
203
def itertree(path):
204
    """
205
    Generate a list of paths by recursively enumerating ``path``
206
    """
207
    if not isinstance(path, Path):
208
        path = Path(path)
209
    if path.is_dir():
210
        for subpath in path.iterdir():
211
            yield from itertree(subpath)
212
    yield path
213