Passed
Pull Request — master (#1319)
by Konstantin
02:12
created

ocrd.resource_manager.OcrdResourceManager.download()   F

Complexity

Conditions 18

Size

Total Lines 57
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 47
dl 0
loc 57
rs 1.2
c 0
b 0
f 0
cc 18
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.resource_manager.OcrdResourceManager.download() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from logging import Logger
2
from pathlib import Path
3
from os.path import join
4
from os import environ, listdir, getcwd, unlink
5
from shutil import copytree, rmtree, copy
6
from fnmatch import filter as apply_glob
7
from datetime import datetime
8
from tarfile import open as open_tarfile
9
from typing import Dict, Optional
10
from urllib.parse import urlparse, unquote
11
from zipfile import ZipFile
12
13
import requests
14
from gdown.parse_url import parse_url as gparse_url
15
from gdown.download import get_url_from_gdrive_confirmation
16
from yaml import safe_load, safe_dump
17
18
# pylint: disable=wrong-import-position
19
20
# https://github.com/OCR-D/core/issues/867
21
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
22
import yaml.constructor
23
yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:timestamp'] = \
24
    yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:str']
25
26
# pylint: enable=wrong-import-position
27
28
# pylint: enable=wrong-import-position
29
30
# pylint: enable=wrong-import-position
31
32
from ocrd_validators import OcrdResourceListValidator
33
from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config
34
from ocrd_utils.constants import RESOURCES_DIR_SYSTEM, RESOURCE_TYPES, MIME_TO_EXT
35
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
36
from .constants import RESOURCE_USER_LIST_COMMENT
37
38
39
class OcrdResourceManager:
40
41
    """
42
    Managing processor resources
43
    """
44
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
45
        self.log = getLogger('ocrd.resource_manager')
46
        self.database = {}
47
48
        self._xdg_data_home = xdg_data_home
49
        self._xdg_config_home = xdg_config_home
50
        self._userdir = userdir
51
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
52
53
        self.log.info(f"OcrdResourceManager data home path: {self.xdg_data_home}")
54
        self.log.info(f"OcrdResourceManager config home path: {self.xdg_config_home}")
55
        self.log.info(f"OcrdResourceManager user list path: {self.user_list}")
56
57
        if not skip_init:
58
            if not self.user_list.exists():
59
                if not self.user_list.parent.exists():
60
                    self.user_list.parent.mkdir(parents=True)
61
                self.save_user_list()
62
            self.load_resource_list(self.user_list)
63
64
    def __repr__(self):
65
        return f"user_list={str(self.user_list)} " + \
66
               f"exists={self.user_list.exists()} " + \
67
               f"database: {len(self.database)} executables " + \
68
               f"{sum(map(len, self.database.values()))} resources"
69
70
    @property
71
    def userdir(self):
72
        if not self._userdir:
73
            self._userdir = config.HOME
74
        return self._userdir
75
76
    @property
77
    def xdg_data_home(self):
78
        if not self._xdg_data_home:
79
            self._xdg_data_home = config.XDG_DATA_HOME
80
        return self._xdg_data_home
81
82
    @property
83
    def xdg_config_home(self):
84
        if not self._xdg_config_home:
85
            self._xdg_config_home = config.XDG_CONFIG_HOME
86
        return self._xdg_config_home
87
88
    def save_user_list(self, database=None):
89
        if not database:
90
            database = self.database
91
        self.log.info(f"Saving resources to path: {self.user_list}")
92
        self._dedup_database()
93
        with open(self.user_list, 'w', encoding='utf-8') as f:
94
            f.write(RESOURCE_USER_LIST_COMMENT)
95
            f.write('\n')
96
            f.write(safe_dump(database))
97
98
    def load_resource_list(self, list_filename: Path, database=None):
99
        self.log.info(f"Loading resources from path: {list_filename}")
100
        if not database:
101
            database = self.database
102
        if list_filename.is_file():
103
            with open(list_filename, 'r', encoding='utf-8') as f:
104
                list_loaded = safe_load(f) or {}
105
            report = OcrdResourceListValidator.validate(list_loaded)
106
            if not report.is_valid:
107
                self.log.error('\n'.join(report.errors))
108
                raise ValueError(f"Resource list {list_filename} is invalid!")
109
            for executable, resource_list in list_loaded.items():
110
                if executable not in database:
111
                    database[executable] = []
112
                # Prepend, so user provided is sorted before builtin
113
                database[executable] = list_loaded[executable] + database[executable]
114
        return database
115
116
    def _search_executables(self, executable: Optional[str]):
117
        skip_executables = ["ocrd-cis-data", "ocrd-import", "ocrd-make"]
118
        for exec_dir in environ['PATH'].split(':'):
119
            self.log.debug(f"Searching for executables inside path: {exec_dir}")
120
            for exec_path in Path(exec_dir).glob(f'{executable}'):
121
                if not exec_path.name.startswith('ocrd-'):
122
                    self.log.warning(f"OCR-D processor executable '{exec_path}' has no 'ocrd-' prefix")
123
                if exec_path.name in skip_executables:
124
                    self.log.debug(f"Not an OCR-D processor CLI, skipping '{exec_path}'")
125
                    continue
126
                self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
127
                ocrd_tool = get_ocrd_tool_json(exec_path)
128
                for res_dict in ocrd_tool.get('resources', ()):
129
                    if exec_path.name not in self.database:
130
                        self.database[exec_path.name] = []
131
                    self.database[exec_path.name].insert(0, res_dict)
132
133
    def list_available(
134
        self, executable: str = None, dynamic: bool = True, name: str = None, database: Dict = None, url: str = None
135
    ):
136
        """
137
        List models available for download by processor
138
        """
139
        if not database:
140
            database = self.database
141
        if not executable:
142
            return list(database.items())
143
        if dynamic:
144
            self._search_executables(executable)
145
            self.save_user_list()
146
        found = False
147
        ret = []
148
        for k in database:
149
            if apply_glob([k], executable):
150
                found = True
151
                restuple = (k, [])
152
                ret.append(restuple)
153
                for resdict in database[k]:
154
                    if name and resdict['name'] != name:
155
                        continue
156
                    if url and resdict['url'] != url:
157
                        continue
158
                    restuple[1].append(resdict)
159
        if not found:
160
            ret = [(executable, [])]
161
        return ret
162
163
    def list_installed(self, executable: str = None):
164
        """
165
        List installed resources, matching with registry by ``name``
166
        """
167
        ret = []
168
        if executable:
169
            all_executables = [executable]
170
        else:
171
            # resources we know about
172
            all_executables = list(self.database.keys())
173
            # resources in the file system
174
            parent_dirs = [f"{join(self.xdg_data_home, 'ocrd-resources')}", RESOURCES_DIR_SYSTEM]
175
            for parent_dir in parent_dirs:
176
                if Path(parent_dir).exists():
177
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
178
        for this_executable in set(all_executables):
179
            reslist = []
180
            moduledir = get_moduledir(this_executable)
181
            resdict_list = self.list_available(executable=this_executable)[0][1]
182
            for res_filename in list_all_resources(this_executable,
183
                                                   moduled=moduledir,
184
                                                   xdg_data_home=self.xdg_data_home):
185
                res_filename = Path(res_filename).resolve()
186
                res_name = res_filename.name
187
                res_type = 'file' if res_filename.is_file() else 'directory'
188
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
189
                if resdict := next((res for res in resdict_list if res['name'] == res_name), False):
190
                    pass
191
                elif str(res_filename.parent).startswith(moduledir):
192
                    resdict = {
193
                        'name': res_name, 
194
                        'url': str(res_filename), 
195
                        'description': 'Found at module', 
196
                        'type': res_type,
197
                        'size': res_size
198
                    }
199
                else:
200
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
201
                # resdict['path'] = str(res_filename)
202
                reslist.append(resdict)
203
            ret.append((this_executable, reslist))
204
        self.save_user_list()
205
        return ret
206
207
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
208
        """
209
        Add a stub entry to the user resource.yml
210
        """
211
        res_name = res_filename.name
212
        if Path(res_filename).is_dir():
213
            res_size = directory_size(res_filename)
214
        else:
215
            res_size = Path(res_filename).stat().st_size
216
        user_database = self.load_resource_list(self.user_list)
217
        if executable not in user_database:
218
            user_database[executable] = []
219
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
220
        if not resources_found:
221
            self.log.info(f"{executable} resource '{res_name}' ({str(res_filename)}) not a known resource, "
222
                          f"creating stub in {self.user_list}'")
223
            resdict = {
224
                'name': res_name,
225
                'url': url if url else '???',
226
                'description': f'Found at {self.resource_dir_to_location(res_filename)} on {datetime.now()}',
227
                'version_range': '???',
228
                'type': resource_type,
229
                'size': res_size
230
            }
231
            user_database[executable].append(resdict)
232
        else:
233
            resdict = resources_found[0]
234
        self.save_user_list(user_database)
235
        self.load_resource_list(self.user_list)
236
        return resdict
237
238
    @property
239
    def default_resource_dir(self):
240
        return self.location_to_resource_dir('data')
241
242
    def location_to_resource_dir(self, location: str) -> str:
243
        if location == 'data':
244
            return join(self.xdg_data_home, 'ocrd-resources')
245
        if location == 'system':
246
            return RESOURCES_DIR_SYSTEM
247
        return getcwd()
248
249
    def resource_dir_to_location(self, resource_path: Path) -> str:
250
        resource_path = str(resource_path)
251
        if resource_path.startswith(RESOURCES_DIR_SYSTEM):
252
            return 'system'
253
        if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')):
254
            return 'data'
255
        if resource_path.startswith(getcwd()):
256
            return 'cwd'
257
        return resource_path
258
259
    def build_resource_dest_dir(self, location: str, executable: str) -> Path:
260
        if location == 'module':
261
            base_dir = get_moduledir(executable)
262
            if not base_dir:
263
                base_dir = self.location_to_resource_dir('data')
264
        else:
265
            base_dir = self.location_to_resource_dir(location)
266
        no_subdir = location in ['cwd', 'module']
267
        dest_dir = Path(base_dir) if no_subdir else Path(base_dir, executable)
268
        return dest_dir
269
270
    @staticmethod
271
    def remove_resource(log: Logger, resource_path: Path):
272
        if resource_path.is_dir():
273
            log.info(f"Removing existing target resource directory {resource_path}")
274
            rmtree(str(resource_path))
275
        else:
276
            log.info(f"Removing existing target resource file {resource_path}")
277
            unlink(str(resource_path))
278
279
    @staticmethod
280
    def parameter_usage(name: str, usage: str = 'as-is') -> str:
281
        if usage == 'as-is':
282
            return name
283
        elif usage == 'without-extension':
284
            return Path(name).stem
285
        raise ValueError(f"No such usage '{usage}'")
286
287
    @staticmethod
288
    def _download_impl(log: Logger, url: str, filename):
289
        log.info(f"Downloading {url} to {filename}")
290
        try:
291
            gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
292
            if gdrive_file_id:
293
                if not is_gdrive_download_link:
294
                    url = f"https://drive.google.com/uc?id={gdrive_file_id}"
295
                try:
296
                    with requests.get(url, stream=True) as r:
297
                        if "Content-Disposition" not in r.headers:
298
                            url = get_url_from_gdrive_confirmation(r.text)
299
                except RuntimeError as e:
300
                    log.warning(f"Cannot unwrap Google Drive URL: {e}")
301
            with open(filename, 'wb') as f:
302
                with requests.get(url, stream=True) as r:
303
                    r.raise_for_status()
304
                    for data in r.iter_content(chunk_size=4096):
305
                        f.write(data)
306
        except Exception as e:
307
            rmtree(filename, ignore_errors=True)
308
            Path(filename).unlink(missing_ok=True)
309
            raise e
310
311
    @staticmethod
312
    def _copy_file(log: Logger, src, dst):
313
        log.info(f"Copying file {src} to {dst}")
314
        with open(dst, 'wb') as f_out, open(src, 'rb') as f_in:
315
            while True:
316
                chunk = f_in.read(4096)
317
                if chunk:
318
                    f_out.write(chunk)
319
                else:
320
                    break
321
322
    @staticmethod
323
    def _copy_dir(log: Logger, src, dst):
324
        log.info(f"Copying dir recursively from {src} to {dst}")
325
        if not Path(src).is_dir():
326
            raise ValueError(f"The source is not a directory: {src}")
327
        Path(dst).mkdir(parents=True, exist_ok=True)
328
        for child in Path(src).rglob('*'):
329
            child_dst = Path(dst) / child.relative_to(src)
330
            if Path(child).is_dir():
331
                OcrdResourceManager._copy_dir(log, child, child_dst)
332
            else:
333
                OcrdResourceManager._copy_file(log, child, child_dst)
334
335
    @staticmethod
336
    def _copy_impl(log: Logger, src_filename, filename):
337
        log.info(f"Copying {src_filename} to {filename}")
338
        if Path(src_filename).is_dir():
339
            OcrdResourceManager._copy_dir(log, src_filename, filename)
340
        else:
341
            OcrdResourceManager._copy_file(log, src_filename, filename)
342
343
    @staticmethod
344
    def _extract_archive(log: Logger, tempdir: Path, path_in_archive: str, fpath: Path, archive_fname: str):
345
        Path('out').mkdir()
346
        with pushd_popd('out'):
347
            mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
348
            log.info(f"Extracting {mimetype} archive to {tempdir}/out")
349
            if mimetype == 'application/zip':
350
                with ZipFile(f'../{archive_fname}', 'r') as zipf:
351
                    zipf.extractall()
352
            elif mimetype in ('application/gzip', 'application/x-xz'):
353
                with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
354
                    tar.extractall()
355
            else:
356
                raise RuntimeError(f"Unable to handle extraction of {mimetype} archive")
357
            log.info(f"Copying '{path_in_archive}' from archive to {fpath}")
358
            if Path(path_in_archive).is_dir():
359
                copytree(path_in_archive, str(fpath))
360
            else:
361
                copy(path_in_archive, str(fpath))
362
363
    def copy_resource(
364
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
365
    ) -> Path:
366
        """
367
        Copy a local resource to another destination
368
        """
369
        if resource_type == 'archive':
370
            archive_fname = 'download.tar.xx'
371
            with pushd_popd(tempdir=True) as tempdir:
372
                self._copy_impl(log, url, archive_fname)
373
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
374
        else:
375
            self._copy_impl(log, url, fpath)
376
        return fpath
377
378
    def download_resource(
379
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
380
    ) -> Path:
381
        """
382
        Download a resource by URL to a destination directory
383
        """
384
        if resource_type == 'archive':
385
            archive_fname = 'download.tar.xx'
386
            with pushd_popd(tempdir=True) as tempdir:
387
                self._download_impl(log, url, archive_fname)
388
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
389
        else:
390
            self._download_impl(log, url, fpath)
391
        return fpath
392
393
    # TODO Proper caching (make head request for size, If-Modified etc)
394
    def handle_resource(
395
        self, res_dict: Dict, executable: str, dest_dir: Path, any_url: str, overwrite: bool = False,
396
        resource_type: str = 'file', path_in_archive: str = '.'
397
    ) -> Optional[Path]:
398
        """
399
        Download or Copy a resource by URL to a destination directory
400
        """
401
        log = getLogger('ocrd.resource_manager.handle_resource')
402
        registered = "registered" if "size" in res_dict else "unregistered"
403
        resource_type = res_dict.get('type', resource_type)
404
        resource_name = res_dict.get('name', None)
405
        path_in_archive = res_dict.get('path_in_archive', path_in_archive)
406
407
        if resource_type not in RESOURCE_TYPES:
408
            raise ValueError(f"Unknown resource type: {resource_type}, must be one of: {RESOURCE_TYPES}")
409
        if any_url:
410
            res_dict['url'] = any_url
411
        if not resource_name:
412
            url_parsed = urlparse(res_dict['url'])
413
            resource_name = Path(unquote(url_parsed.path)).name
414
            if resource_type == 'archive' and path_in_archive != '.':
415
                resource_name = Path(path_in_archive).name
416
        if res_dict['url'] == '???':
417
            log.warning(f"Skipping user resource {resource_name} since download url is: {res_dict['url']}")
418
            return None
419
420
        fpath = Path(dest_dir, resource_name)
421
        if fpath.exists():
422
            if not overwrite:
423
                fpath_type = 'Directory' if fpath.is_dir() else 'File'
424
                log.warning(f"{fpath_type} {fpath} already exists but --overwrite is not set, skipping the download")
425
                # raise FileExistsError(f"{fpath_type} {fpath} already exists but --overwrite is not set")
426
                return fpath
427
            self.remove_resource(log, resource_path=fpath)
428
        dest_dir.mkdir(parents=True, exist_ok=True)
429
430
        # TODO @mehmedGIT: Consider properly handling cases for invalid URLs.
431
        if res_dict['url'].startswith('https://') or res_dict['url'].startswith('http://'):
432
            log.info(f"Downloading {registered} resource '{resource_name}' ({res_dict['url']})")
433
            if 'size' not in res_dict:
434
                with requests.head(res_dict['url']) as r:
435
                    res_dict['size'] = int(r.headers.get('content-length', 0))
436
            fpath = self.download_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
437
        else:
438
            log.info(f"Copying {registered} resource '{resource_name}' ({res_dict['url']})")
439
            urlpath = Path(res_dict['url'])
440
            res_dict['url'] = str(urlpath.resolve())
441
            res_dict['size'] = directory_size(urlpath) if Path(urlpath).is_dir() else urlpath.stat().st_size
442
            fpath = self.copy_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
443
444
        if registered == 'unregistered':
445
            self.add_to_user_database(executable, fpath, url=res_dict['url'])
446
        self.save_user_list()
447
        log.info(f"Installed resource {res_dict['url']} under {fpath}")
448
        return fpath
449
450
    def _dedup_database(self, database=None, dedup_key='name'):
451
        """
452
        Deduplicate resources by name
453
        """
454
        if not database:
455
            database = self.database
456
        for executable, reslist in database.items():
457
            reslist_dedup = []
458
            for resdict in reslist:
459
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
460
                    reslist_dedup.append(resdict)
461
            database[executable] = reslist_dedup
462
        return database
463