OcrdResourceManager.download()   F
last analyzed

Complexity

Conditions 18

Size

Total Lines 57
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 47
dl 0
loc 57
rs 1.2
c 0
b 0
f 0
cc 18
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.resource_manager.OcrdResourceManager.download() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from pathlib import Path
2
from os.path import join
3
from os import environ, listdir, getcwd, unlink
4
from shutil import copytree, rmtree, copy
5
from fnmatch import filter as apply_glob
6
from datetime import datetime
7
from tarfile import open as open_tarfile
8
from urllib.parse import urlparse, unquote
9
from zipfile import ZipFile
10
11
import requests
12
from gdown.parse_url import parse_url as gparse_url
13
from gdown.download import get_url_from_gdrive_confirmation
14
from yaml import safe_load, safe_dump
15
16
# pylint: disable=wrong-import-position
17
18
# https://github.com/OCR-D/core/issues/867
19
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
20
import yaml.constructor
21
yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:timestamp'] = \
22
    yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:str']
23
24
# pylint: enable=wrong-import-position
25
26
# pylint: enable=wrong-import-position
27
28
# pylint: enable=wrong-import-position
29
30
from ocrd_validators import OcrdResourceListValidator
31
from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config
32
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
33
from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT
34
35
36
class OcrdResourceManager:
37
38
    """
39
    Managing processor resources
40
    """
41
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
42
        self.log = getLogger('ocrd.resource_manager')
43
        self.database = {}
44
45
        self._xdg_data_home = xdg_data_home
46
        self._xdg_config_home = xdg_config_home
47
        self._userdir = userdir
48
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
49
50
        if not skip_init:
51
            self.load_resource_list(Path(RESOURCE_LIST_FILENAME))
52
            if not self.user_list.exists():
53
                if not self.user_list.parent.exists():
54
                    self.user_list.parent.mkdir(parents=True)
55
                self.save_user_list()
56
            self.load_resource_list(self.user_list)
57
58
    @property
59
    def userdir(self):
60
        if not self._userdir:
61
            self._userdir = config.HOME
62
        return self._userdir
63
64
    @property
65
    def xdg_data_home(self):
66
        if not self._xdg_data_home:
67
            self._xdg_data_home = config.XDG_DATA_HOME
68
        return self._xdg_data_home
69
70
    @property
71
    def xdg_config_home(self):
72
        if self._xdg_config_home:
73
            return self._xdg_config_home
74
        return config.XDG_CONFIG_HOME
75
76
    def save_user_list(self, database=None):
77
        if not database:
78
            database = self.database
79
        with open(self.user_list, 'w', encoding='utf-8') as f:
80
            f.write(RESOURCE_USER_LIST_COMMENT)
81
            f.write('\n')
82
            f.write(safe_dump(database))
83
84
    def load_resource_list(self, list_filename, database=None):
85
        if not database:
86
            database = self.database
87
        if list_filename.is_file():
88
            with open(list_filename, 'r', encoding='utf-8') as f:
89
                list_loaded = safe_load(f) or {}
90
            report = OcrdResourceListValidator.validate(list_loaded)
91
            if not report.is_valid:
92
                self.log.error('\n'.join(report.errors))
93
                raise ValueError(f"Resource list {list_filename} is invalid!")
94
            for executable, resource_list in list_loaded.items():
95
                if executable not in database:
96
                    database[executable] = []
97
                # Prepend, so user provided is sorted before builtin
98
                database[executable] = list_loaded[executable] + database[executable]
99
        return database
100
101
    def list_available(self, executable=None, dynamic=True, name=None, database=None, url=None):
102
        """
103
        List models available for download by processor
104
        """
105
        if not database:
106
            database = self.database
107
        if not executable:
108
            return database.items()
109
        if dynamic:
110
            skip_executables = ["ocrd-cis-data", "ocrd-import", "ocrd-make"]
111
            for exec_dir in environ['PATH'].split(':'):
112
                for exec_path in Path(exec_dir).glob(f'{executable}'):
113
                    if not exec_path.name.startswith('ocrd-'):
114
                        self.log.warning(f"OCR-D processor executable '{exec_path}' has no 'ocrd-' prefix")
115
                    if exec_path.name in skip_executables:
116
                        self.log.debug(f"Not an OCR-D processor CLI, skipping '{exec_path}'")
117
                        continue
118
                    self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
119
                    ocrd_tool = get_ocrd_tool_json(exec_path)
120
                    for resdict in ocrd_tool.get('resources', ()):
121
                        if exec_path.name not in database:
122
                            database[exec_path.name] = []
123
                        database[exec_path.name].insert(0, resdict)
124
            database = self._dedup_database(database)
125
        found = False
126
        ret = []
127
        for k in database:
128
            if apply_glob([k], executable):
129
                found = True
130
                restuple = (k, [])
131
                ret.append(restuple)
132
                for resdict in database[k]:
133
                    if name and resdict['name'] != name:
134
                        continue
135
                    if url and resdict['url'] != url:
136
                        continue
137
                    restuple[1].append(resdict)
138
        if not found:
139
            ret = [(executable, [])]
140
        return ret
141
142
    def list_installed(self, executable=None):
143
        """
144
        List installed resources, matching with registry by ``name``
145
        """
146
        ret = []
147
        if executable:
148
            all_executables = [executable]
149
        else:
150
            # resources we know about
151
            all_executables = list(self.database.keys())
152
            # resources in the file system
153
            parent_dirs = [join(x, 'ocrd-resources') for x in [self.xdg_data_home, '/usr/local/share']]
154
            for parent_dir in parent_dirs:
155
                if Path(parent_dir).exists():
156
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
157
        for this_executable in set(all_executables):
158
            reslist = []
159
            mimetypes = get_processor_resource_types(this_executable)
160
            moduledir = get_moduledir(this_executable)
161
            for res_filename in list_all_resources(this_executable, moduled=moduledir, xdg_data_home=self.xdg_data_home):
162
                res_filename = Path(res_filename)
163
                if not '*/*' in mimetypes:
164
                    if res_filename.is_dir() and not 'text/directory' in mimetypes:
165
                        continue
166
                    if res_filename.is_file() and ['text/directory'] == mimetypes:
167
                        continue
168
                res_name = res_filename.name
169
                res_type = 'file' if res_filename.is_file() else 'directory'
170
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
171
                resdict_list = [x for x in self.database.get(this_executable, []) if x['name'] == res_name]
172
                if resdict_list:
173
                    resdict = resdict_list[0]
174
                elif str(res_filename.parent) == moduledir:
175
                    resdict = {
176
                        'name': res_name, 
177
                        'url': str(res_filename), 
178
                        'description': 'Found at module', 
179
                        'type': res_type,
180
                        'size': res_size
181
                    }
182
                else:
183
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
184
                resdict['path'] = str(res_filename)
185
                reslist.append(resdict)
186
            ret.append((this_executable, reslist))
187
        return ret
188
189
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
190
        """
191
        Add a stub entry to the user resource.yml
192
        """
193
        res_name = Path(res_filename).name
194
        self.log.info(f"{executable} resource '{res_name}' ({str(res_filename)}) not a known resource, "
195
                      f"creating stub in {self.user_list}'")
196
        if Path(res_filename).is_dir():
197
            res_size = directory_size(res_filename)
198
        else:
199
            res_size = Path(res_filename).stat().st_size
200
        with open(self.user_list, 'r', encoding='utf-8') as f:
201
            user_database = safe_load(f) or {}
202
        if executable not in user_database:
203
            user_database[executable] = []
204
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
205
        if not resources_found:
206
            resdict = {
207
                'name': res_name,
208
                'url': url if url else '???',
209
                'description': f'Found at {self.resource_dir_to_location(res_filename)} on {datetime.now()}',
210
                'version_range': '???',
211
                'type': resource_type,
212
                'size': res_size
213
            }
214
            user_database[executable].append(resdict)
215
        else:
216
            resdict = resources_found[0]
217
        self.save_user_list(user_database)
218
        self.load_resource_list(self.user_list)
219
        return resdict
220
221
    @property
222
    def default_resource_dir(self):
223
        return self.location_to_resource_dir('data')
224
225
    def location_to_resource_dir(self, location):
226
        return '/usr/local/share/ocrd-resources' if location == 'system' else \
227
                join(self.xdg_data_home, 'ocrd-resources') if location == 'data' else \
228
                getcwd()
229
230
    def resource_dir_to_location(self, resource_path):
231
        resource_path = str(resource_path)
232
        return 'system' if resource_path.startswith('/usr/local/share/ocrd-resources') else \
233
               'data' if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')) else \
234
               'cwd' if resource_path.startswith(getcwd()) else \
235
               resource_path
236
237
    @staticmethod
238
    def parameter_usage(name, usage='as-is'):
239
        if usage == 'as-is':
240
            return name
241
        elif usage == 'without-extension':
242
            return Path(name).stem
243
        raise ValueError(f"No such usage '{usage}'")
244
245
    @staticmethod
246
    def _download_impl(url, filename, progress_cb=None, size=None):
247
        log = getLogger('ocrd.resource_manager._download_impl')
248
        log.info(f"Downloading {url} to {filename}")
249
        try:
250
            gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
251
            if gdrive_file_id:
252
                if not is_gdrive_download_link:
253
                    url = f"https://drive.google.com/uc?id={gdrive_file_id}"
254
                try:
255
                    with requests.get(url, stream=True) as r:
256
                        if "Content-Disposition" not in r.headers:
257
                            url = get_url_from_gdrive_confirmation(r.text)
258
                except RuntimeError as e:
259
                    log.warning("Cannot unwrap Google Drive URL: %s", e)
260
            with open(filename, 'wb') as f:
261
                with requests.get(url, stream=True) as r:
262
                    r.raise_for_status()
263
                    for data in r.iter_content(chunk_size=4096):
264
                        if progress_cb:
265
                            progress_cb(len(data))
266
                        f.write(data)
267
        except Exception as e:
268
            rmtree(filename, ignore_errors=True)
269
            Path(filename).unlink(missing_ok=True)
270
            raise e
271
272
    @staticmethod
273
    def _copy_file(src, dst, progress_cb=None):
274
        log = getLogger('ocrd.resource_manager._copy_file')
275
        log.info(f"Copying file {src} to {dst}")
276
        with open(dst, 'wb') as f_out, open(src, 'rb') as f_in:
277
            while True:
278
                chunk = f_in.read(4096)
279
                if chunk:
280
                    f_out.write(chunk)
281
                    if progress_cb:
282
                        progress_cb(len(chunk))
283
                else:
284
                    break
285
286
    @staticmethod
287
    def _copy_dir(src, dst, progress_cb=None):
288
        log = getLogger('ocrd.resource_manager._copy_dir')
289
        log.info(f"Copying dir recursively from {src} to {dst}")
290
        if not Path(src).is_dir():
291
            raise ValueError(f"The source is not a directory: {src}")
292
        Path(dst).mkdir(parents=True, exist_ok=True)
293
        for child in Path(src).rglob('*'):
294
            child_dst = Path(dst) / child.relative_to(src)
295
            if Path(child).is_dir():
296
                OcrdResourceManager._copy_dir(child, child_dst, progress_cb)
297
            else:
298
                OcrdResourceManager._copy_file(child, child_dst, progress_cb)
299
300
    @staticmethod
301
    def _copy_impl(src_filename, filename, progress_cb=None):
302
        log = getLogger('ocrd.resource_manager._copy_impl')
303
        log.info(f"Copying {src_filename} to {filename}")
304
        if Path(src_filename).is_dir():
305
            OcrdResourceManager._copy_dir(src_filename, filename, progress_cb)
306
        else:
307
            OcrdResourceManager._copy_file(src_filename, filename, progress_cb)
308
309
    # TODO Proper caching (make head request for size, If-Modified etc)
310
    def download(
311
        self, executable, url, basedir, overwrite=False, no_subdir=False, name=None, resource_type='file',
312
        path_in_archive='.', progress_cb=None,
313
    ):
314
        """
315
        Download a resource by URL
316
        """
317
        log = getLogger('ocrd.resource_manager.download')
318
        destdir = Path(basedir) if no_subdir else Path(basedir, executable)
319
        if not name:
320
            url_parsed = urlparse(url)
321
            name = Path(unquote(url_parsed.path)).name
322
        fpath = Path(destdir, name)
323
        is_url = url.startswith('https://') or url.startswith('http://')
324
        if fpath.exists():
325
            if not overwrite:
326
                fpath_type = 'Directory' if fpath.is_dir() else 'File'
327
                log.warning(f"{fpath_type} {fpath} already exists but --overwrite is not set, skipping the download")
328
                # raise FileExistsError(f"{fpath_type} {fpath} already exists but --overwrite is not set")
329
                return fpath
330
            if fpath.is_dir():
331
                log.info(f"Removing existing target directory {fpath}")
332
                rmtree(str(fpath))
333
            else:
334
                log.info(f"Removing existing target file {fpath}")
335
                unlink(str(fpath))
336
        destdir.mkdir(parents=True, exist_ok=True)
337
        if resource_type in ('file', 'directory'):
338
            if is_url:
339
                self._download_impl(url, fpath, progress_cb)
340
            else:
341
                self._copy_impl(url, fpath, progress_cb)
342
        elif resource_type == 'archive':
343
            archive_fname = 'download.tar.xx'
344
            with pushd_popd(tempdir=True) as tempdir:
345
                if is_url:
346
                    self._download_impl(url, archive_fname, progress_cb)
347
                else:
348
                    self._copy_impl(url, archive_fname, progress_cb)
349
                Path('out').mkdir()
350
                with pushd_popd('out'):
351
                    mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
352
                    log.info(f"Extracting {mimetype} archive to {tempdir}/out")
353
                    if mimetype == 'application/zip':
354
                        with ZipFile(f'../{archive_fname}', 'r') as zipf:
355
                            zipf.extractall()
356
                    elif mimetype in ('application/gzip', 'application/x-xz'):
357
                        with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
358
                            tar.extractall()
359
                    else:
360
                        raise RuntimeError(f"Unable to handle extraction of {mimetype} archive {url}")
361
                    log.info(f"Copying '{path_in_archive}' from archive to {fpath}")
362
                    if Path(path_in_archive).is_dir():
363
                        copytree(path_in_archive, str(fpath))
364
                    else:
365
                        copy(path_in_archive, str(fpath))
366
        return fpath
367
368
    def _dedup_database(self, database=None, dedup_key='name'):
369
        """
370
        Deduplicate resources by name
371
        """
372
        if not database:
373
            database = self.database
374
        for executable, reslist in database.items():
375
            reslist_dedup = []
376
            for resdict in reslist:
377
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
378
                    reslist_dedup.append(resdict)
379
            database[executable] = reslist_dedup
380
        return database
381