Passed
Pull Request — master (#967)
by Konstantin
02:34
created

OcrdResourceManager.download()   F

Complexity

Conditions 16

Size

Total Lines 54
Code Lines 46

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 46
dl 0
loc 54
rs 2.4
c 0
b 0
f 0
cc 16
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.resource_manager.OcrdResourceManager.download() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from pathlib import Path
2
from os.path import join
3
from json import loads
4
from os import environ, listdir, getcwd, path
5
from fnmatch import filter as apply_glob
6
from shutil import copytree, copy
7
from datetime import datetime
8
from tarfile import open as open_tarfile
9
from urllib.parse import urlparse, unquote
10
from subprocess import run, PIPE
11
from zipfile import ZipFile
12
13
import requests
14
from yaml import safe_load, safe_dump
15
import magic
16
17
# https://github.com/OCR-D/core/issues/867
18
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
19
import yaml.constructor
20
yaml.constructor.SafeConstructor.yaml_constructors[u'tag:yaml.org,2002:timestamp'] = \
21
    yaml.constructor.SafeConstructor.yaml_constructors[u'tag:yaml.org,2002:str']
22
23
from ocrd_validators import OcrdResourceListValidator
24
from ocrd_utils import getLogger, directory_size, get_moduledir
25
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
26
from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT
27
28
class OcrdResourceManager():
29
30
    """
31
    Managing processor resources
32
    """
33
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
34
        self.log = getLogger('ocrd.resource_manager')
35
        self.database = {}
36
37
        self._xdg_data_home = xdg_data_home
38
        self._xdg_config_home = xdg_config_home
39
        self._userdir = userdir
40
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
41
42
        if not skip_init:
43
            self.load_resource_list(Path(RESOURCE_LIST_FILENAME))
44
            if not self.user_list.exists():
45
                if not self.user_list.parent.exists():
46
                    self.user_list.parent.mkdir(parents=True)
47
                self.save_user_list()
48
            self.load_resource_list(self.user_list)
49
50
    @property
51
    def userdir(self):
52
        if not self._userdir:
53
            self._userdir = path.expanduser('~')
54
            if 'HOME' in environ and environ['HOME'] != path.expanduser('~'):
55
                self._userdir = environ['HOME']
56
        return self._userdir
57
58
    @property
59
    def xdg_data_home(self):
60
        if not self._xdg_data_home:
61
            if 'XDG_DATA_HOME' in environ:
62
                self._xdg_data_home = environ['XDG_DATA_HOME']
63
            else:
64
                self._xdg_data_home = join(self.userdir, '.local', 'share')
65
        return self._xdg_data_home
66
67
    @property
68
    def xdg_config_home(self):
69
        if not self._xdg_config_home:
70
            if 'XDG_CONFIG_HOME' in environ:
71
                self._xdg_config_home = environ['XDG_CONFIG_HOME']
72
            else:
73
                self._xdg_config_home = join(self.userdir, '.config')
74
        return self._xdg_config_home
75
76
    def save_user_list(self, database=None):
77
        if not database:
78
            database = self.database
79
        with open(self.user_list, 'w', encoding='utf-8') as f:
80
            f.write(RESOURCE_USER_LIST_COMMENT)
81
            f.write('\n')
82
            f.write(safe_dump(database))
83
84
    def load_resource_list(self, list_filename, database=None):
85
        if not database:
86
            database = self.database
87
        if list_filename.is_file():
88
            with open(list_filename, 'r', encoding='utf-8') as f:
89
                list_loaded = safe_load(f) or {}
90
            report = OcrdResourceListValidator.validate(list_loaded)
91
            if not report.is_valid:
92
                self.log.error('\n'.join(report.errors))
93
                raise ValueError("Resource list %s is invalid!" % (list_filename))
94
            for executable, resource_list in list_loaded.items():
95
                if executable not in database:
96
                    database[executable] = []
97
                # Prepend, so user provided is sorted before builtin
98
                database[executable] = list_loaded[executable] + database[executable]
99
        return database
100
101
    def list_available(self, executable=None, dynamic=True, name=None, database=None, url=None):
102
        """
103
        List models available for download by processor
104
        """
105
        if not database:
106
            database = self.database
107
        if not executable:
108
            return database.items()
109
        if dynamic:
110
            for exec_dir in environ['PATH'].split(':'):
111
                for exec_path in Path(exec_dir).glob(f'{executable}'):
112
                    self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
113
                    ocrd_tool = get_ocrd_tool_json(exec_path)
114
                    for resdict in ocrd_tool.get('resources', ()):
115
                        if exec_path.name not in database:
116
                            database[exec_path.name] = []
117
                        database[exec_path.name].append(resdict)
118
            database = self._dedup_database(database)
119
        found = False
120
        ret = []
121
        for k in database:
122
            if apply_glob([k], executable):
123
                found = True
124
                restuple = (k, [])
125
                ret.append(restuple)
126
                for resdict in database[k]:
127
                    if name and resdict['name'] != name:
128
                        continue
129
                    if url and resdict['url'] != url:
130
                        continue
131
                    restuple[1].append(resdict)
132
        if not found:
133
            ret = [(executable, [])]
134
        return ret
135
136
    def list_installed(self, executable=None):
137
        """
138
        List installed resources, matching with registry by ``name``
139
        """
140
        ret = []
141
        if executable:
142
            all_executables = [executable]
143
        else:
144
            # resources we know about
145
            all_executables = list(self.database.keys())
146
            # resources in the file system
147
            parent_dirs = [join(x, 'ocrd-resources') for x in [self.xdg_data_home, '/usr/local/share']]
148
            for parent_dir in parent_dirs:
149
                if Path(parent_dir).exists():
150
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
151
        for this_executable in set(all_executables):
152
            reslist = []
153
            mimetypes = get_processor_resource_types(this_executable)
154
            moduledir = get_moduledir(this_executable)
155
            for res_filename in list_all_resources(this_executable, moduled=moduledir, xdg_data_home=self.xdg_data_home):
156
                res_filename = Path(res_filename)
157
                if not '*/*' in mimetypes:
158
                    if res_filename.is_dir() and not 'text/directory' in mimetypes:
159
                        continue
160
                    if res_filename.is_file() and ['text/directory'] == mimetypes:
161
                        continue
162
                res_name = res_filename.name
163
                res_type = 'file' if res_filename.is_file() else 'directory'
164
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
165
                resdict_list = [x for x in self.database.get(this_executable, []) if x['name'] == res_name]
166
                if resdict_list:
167
                    resdict = resdict_list[0]
168
                elif str(res_filename.parent) == moduledir:
169
                    resdict = {
170
                        'name': res_name, 
171
                        'url': str(res_filename), 
172
                        'description': 'Found at module', 
173
                        'type': res_type,
174
                        'size': res_size
175
                    }
176
                else:
177
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
178
                resdict['path'] = str(res_filename)
179
                reslist.append(resdict)
180
            ret.append((this_executable, reslist))
181
        return ret
182
183
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
184
        """
185
        Add a stub entry to the user resource.yml
186
        """
187
        res_name = Path(res_filename).name
188
        self.log.info("%s resource '%s' (%s) not a known resource, creating stub in %s'", executable, res_name, str(res_filename), self.user_list)
189
        if Path(res_filename).is_dir():
190
            res_size = directory_size(res_filename)
191
        else:
192
            res_size = Path(res_filename).stat().st_size
193
        with open(self.user_list, 'r', encoding='utf-8') as f:
194
            user_database = safe_load(f) or {}
195
        if executable not in user_database:
196
            user_database[executable] = []
197
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
198
        if not resources_found:
199
            resdict = {
200
                'name': res_name,
201
                'url': url if url else '???',
202
                'description': 'Found at %s on %s' % (self.resource_dir_to_location(res_filename), datetime.now()),
203
                'version_range': '???',
204
                'type': resource_type,
205
                'size': res_size
206
            }
207
            user_database[executable].append(resdict)
208
        else:
209
            resdict = resources_found[0]
210
        self.save_user_list(user_database)
211
        self.load_resource_list(self.user_list)
212
        return resdict
213
214
    @property
215
    def default_resource_dir(self):
216
        return self.location_to_resource_dir('data')
217
218
    def location_to_resource_dir(self, location):
219
        return '/usr/local/share/ocrd-resources' if location == 'system' else \
220
                join(self.xdg_data_home, 'ocrd-resources') if location == 'data' else \
221
                getcwd()
222
223
    def resource_dir_to_location(self, resource_path):
224
        resource_path = str(resource_path)
225
        return 'system' if resource_path.startswith('/usr/local/share/ocrd-resources') else \
226
               'data' if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')) else \
227
               'cwd' if resource_path.startswith(getcwd()) else \
228
               resource_path
229
230
    def parameter_usage(self, name, usage='as-is'):
231
        if usage == 'as-is':
232
            return name
233
        elif usage == 'without-extension':
234
            return Path(name).stem
235
        raise ValueError("No such usage '%s'" % usage)
236
237
    def _download_impl(self, url, filename, progress_cb=None):
238
        log = getLogger('ocrd.resource_manager._download_impl')
239
        log.info("Downloading %s to %s" % (url, filename))
240
        with open(filename, 'wb') as f:
241
            with requests.get(url, stream=True) as r:
242
                for data in r.iter_content(chunk_size=4096):
243
                    if progress_cb:
244
                        progress_cb(len(data))
245
                    f.write(data)
246
247
    def _copy_impl(self, src_filename, filename, progress_cb=None):
248
        log = getLogger('ocrd.resource_manager._copy_impl')
249
        log.info("Copying %s to %s", src_filename, filename)
250
        if Path(src_filename).is_dir():
251
            log.info(f"Copying recursively from {src_filename} to {filename}")
252
            for child in Path(src_filename).rglob('*'):
253
                child_dst = Path(filename) / child.relative_to(src_filename)
254
                child_dst.parent.mkdir(parents=True, exist_ok=True)
255
                with open(child_dst, 'wb') as f_out, open(child, 'rb') as f_in:
256
                    while True:
257
                        chunk = f_in.read(4096)
258
                        if chunk:
259
                            f_out.write(chunk)
260
                            if progress_cb:
261
                                progress_cb(len(chunk))
262
                        else:
263
                            break
264
        else:
265
            with open(filename, 'wb') as f_out, open(src_filename, 'rb') as f_in:
266
                while True:
267
                    chunk = f_in.read(4096)
268
                    if chunk:
269
                        f_out.write(chunk)
270
                        if progress_cb:
271
                            progress_cb(len(chunk))
272
                    else:
273
                        break
274
275
    # TODO Proper caching (make head request for size, If-Modified etc)
276
    def download(
277
        self,
278
        executable,
279
        url,
280
        basedir,
281
        overwrite=False,
282
        no_subdir=False,
283
        name=None,
284
        resource_type='file',
285
        path_in_archive='.',
286
        progress_cb=None,
287
    ):
288
        """
289
        Download a resource by URL
290
        """
291
        log = getLogger('ocrd.resource_manager.download')
292
        destdir = Path(basedir) if no_subdir else Path(basedir, executable)
293
        if not name:
294
            url_parsed = urlparse(url)
295
            name = Path(unquote(url_parsed.path)).name
296
        fpath = Path(destdir, name)
297
        is_url = url.startswith('https://') or url.startswith('http://')
298
        if fpath.exists() and not overwrite:
299
            log.info("%s to be %s to %s which already exists and overwrite is False" % (url, 'downloaded' if is_url else 'copied', fpath))
300
            return fpath
301
        destdir.mkdir(parents=True, exist_ok=True)
302
        if resource_type in ('file', 'directory'):
303
            if is_url:
304
                self._download_impl(url, fpath, progress_cb)
305
            else:
306
                self._copy_impl(url, fpath, progress_cb)
307
        elif resource_type == 'archive':
308
            archive_fname = 'download.tar.xx'
309
            with pushd_popd(tempdir=True) as tempdir:
310
                if is_url:
311
                    self._download_impl(url, archive_fname, progress_cb)
312
                else:
313
                    self._copy_impl(url, archive_fname, progress_cb)
314
                Path('out').mkdir()
315
                with pushd_popd('out'):
316
                    mimetype = magic.from_file(f'../{archive_fname}', mime=True)
317
                    log.info("Extracting %s archive to %s/out" % (mimetype, tempdir))
318
                    if mimetype == 'application/zip':
319
                        with ZipFile(f'../{archive_fname}', 'r') as zipf:
320
                            zipf.extractall()
321
                    else:
322
                        with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
323
                            tar.extractall()
324
                    log.info("Copying '%s' from archive to %s" % (path_in_archive, fpath))
325
                    if Path(path_in_archive).is_dir():
326
                        copytree(path_in_archive, str(fpath))
327
                    else:
328
                        copy(path_in_archive, str(fpath))
329
        return fpath
330
331
    def _dedup_database(self, database=None, dedup_key='name'):
332
        """
333
        Deduplicate resources by name
334
        """
335
        if not database:
336
            database = self.database
337
        for executable, reslist in database.items():
338
            reslist_dedup = []
339
            for resdict in reslist:
340
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
341
                    reslist_dedup.append(resdict)
342
            database[executable] = reslist_dedup
343
        return database
344