Passed
Push — master ( 96c9ad...2330e7 )
by Konstantin
03:04
created

ocrd.resource_manager.OcrdResourceManager.download()   F

Complexity

Conditions 18

Size

Total Lines 57
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 47
dl 0
loc 57
rs 1.2
c 0
b 0
f 0
cc 18
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.resource_manager.OcrdResourceManager.download() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from logging import Logger
2
from pathlib import Path
3
from os.path import join
4
from os import environ, listdir, getcwd, unlink
5
from shutil import copytree, rmtree, copy
6
from fnmatch import filter as apply_glob
7
from datetime import datetime
8
from tarfile import open as open_tarfile
9
from typing import Dict, Optional
10
from urllib.parse import urlparse, unquote
11
from zipfile import ZipFile
12
13
import requests
14
from gdown.parse_url import parse_url as gparse_url
15
from gdown.download import get_url_from_gdrive_confirmation
16
from yaml import safe_load, safe_dump
17
18
# pylint: disable=wrong-import-position
19
20
# https://github.com/OCR-D/core/issues/867
21
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
22
import yaml.constructor
23
yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:timestamp'] = \
24
    yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:str']
25
26
# pylint: enable=wrong-import-position
27
28
# pylint: enable=wrong-import-position
29
30
# pylint: enable=wrong-import-position
31
32
from ocrd_validators import OcrdResourceListValidator
33
from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config
34
from ocrd_utils.constants import RESOURCES_DIR_SYSTEM, RESOURCE_TYPES, MIME_TO_EXT
35
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
36
from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT
37
38
39
class OcrdResourceManager:
40
41
    """
42
    Managing processor resources
43
    """
44
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
45
        self.log = getLogger('ocrd.resource_manager')
46
        self.database = {}
47
48
        self._xdg_data_home = xdg_data_home
49
        self._xdg_config_home = xdg_config_home
50
        self._userdir = userdir
51
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
52
53
        self.log.info(f"OcrdResourceManager data home path: {self.xdg_data_home}")
54
        self.log.info(f"OcrdResourceManager config home path: {self.xdg_config_home}")
55
        self.log.info(f"OcrdResourceManager user list path: {self.user_list}")
56
57
        if not skip_init:
58
            self.load_resource_list(Path(RESOURCE_LIST_FILENAME))
59
            if not self.user_list.exists():
60
                if not self.user_list.parent.exists():
61
                    self.user_list.parent.mkdir(parents=True)
62
                self.save_user_list()
63
            self.load_resource_list(self.user_list)
64
65
    def __repr__(self):
66
        return f"user_list={str(self.user_list)} " + \
67
               f"exists={self.user_list.exists()} " + \
68
               f"database: {len(self.database)} executables " + \
69
               f"{sum(map(len, self.database.values()))} resources"
70
71
    @property
72
    def userdir(self):
73
        if not self._userdir:
74
            self._userdir = config.HOME
75
        return self._userdir
76
77
    @property
78
    def xdg_data_home(self):
79
        if not self._xdg_data_home:
80
            self._xdg_data_home = config.XDG_DATA_HOME
81
        return self._xdg_data_home
82
83
    @property
84
    def xdg_config_home(self):
85
        if not self._xdg_config_home:
86
            self._xdg_config_home = config.XDG_CONFIG_HOME
87
        return self._xdg_config_home
88
89
    def save_user_list(self, database=None):
90
        if not database:
91
            database = self.database
92
        self.log.info(f"Saving resources to path: {self.user_list}")
93
        self._dedup_database()
94
        with open(self.user_list, 'w', encoding='utf-8') as f:
95
            f.write(RESOURCE_USER_LIST_COMMENT)
96
            f.write('\n')
97
            f.write(safe_dump(database))
98
99
    def load_resource_list(self, list_filename: Path, database=None):
100
        self.log.info(f"Loading resources from path: {list_filename}")
101
        if not database:
102
            database = self.database
103
        if list_filename.is_file():
104
            with open(list_filename, 'r', encoding='utf-8') as f:
105
                list_loaded = safe_load(f) or {}
106
            report = OcrdResourceListValidator.validate(list_loaded)
107
            if not report.is_valid:
108
                self.log.error('\n'.join(report.errors))
109
                raise ValueError(f"Resource list {list_filename} is invalid!")
110
            for executable, resource_list in list_loaded.items():
111
                if executable not in database:
112
                    database[executable] = []
113
                # Prepend, so user provided is sorted before builtin
114
                database[executable] = list_loaded[executable] + database[executable]
115
        return database
116
117
    def _search_executables(self, executable: Optional[str]):
118
        skip_executables = ["ocrd-cis-data", "ocrd-import", "ocrd-make"]
119
        for exec_dir in environ['PATH'].split(':'):
120
            self.log.debug(f"Searching for executables inside path: {exec_dir}")
121
            for exec_path in Path(exec_dir).glob(f'{executable}'):
122
                if not exec_path.name.startswith('ocrd-'):
123
                    self.log.warning(f"OCR-D processor executable '{exec_path}' has no 'ocrd-' prefix")
124
                if exec_path.name in skip_executables:
125
                    self.log.debug(f"Not an OCR-D processor CLI, skipping '{exec_path}'")
126
                    continue
127
                self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
128
                ocrd_tool = get_ocrd_tool_json(exec_path)
129
                for res_dict in ocrd_tool.get('resources', ()):
130
                    if exec_path.name not in self.database:
131
                        self.database[exec_path.name] = []
132
                    self.database[exec_path.name].insert(0, res_dict)
133
134
    def list_available(
135
        self, executable: str = None, dynamic: bool = True, name: str = None, database: Dict = None, url: str = None
136
    ):
137
        """
138
        List models available for download by processor
139
        """
140
        if not database:
141
            database = self.database
142
        if not executable:
143
            return list(database.items())
144
        if dynamic:
145
            self._search_executables(executable)
146
            self.save_user_list()
147
        found = False
148
        ret = []
149
        for k in database:
150
            if apply_glob([k], executable):
151
                found = True
152
                restuple = (k, [])
153
                ret.append(restuple)
154
                for resdict in database[k]:
155
                    if name and resdict['name'] != name:
156
                        continue
157
                    if url and resdict['url'] != url:
158
                        continue
159
                    restuple[1].append(resdict)
160
        if not found:
161
            ret = [(executable, [])]
162
        return ret
163
164
    def list_installed(self, executable: str = None):
165
        """
166
        List installed resources, matching with registry by ``name``
167
        """
168
        ret = []
169
        if executable:
170
            all_executables = [executable]
171
        else:
172
            # resources we know about
173
            all_executables = list(self.database.keys())
174
            # resources in the file system
175
            parent_dirs = [f"{join(self.xdg_data_home, 'ocrd-resources')}", RESOURCES_DIR_SYSTEM]
176
            for parent_dir in parent_dirs:
177
                if Path(parent_dir).exists():
178
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
179
        for this_executable in set(all_executables):
180
            reslist = []
181
            moduledir = get_moduledir(this_executable)
182
            resdict_list = self.list_available(executable=this_executable)[0][1]
183
            for res_filename in list_all_resources(this_executable,
184
                                                   moduled=moduledir,
185
                                                   xdg_data_home=self.xdg_data_home):
186
                res_filename = Path(res_filename).resolve()
187
                res_name = res_filename.name
188
                res_type = 'file' if res_filename.is_file() else 'directory'
189
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
190
                if resdict := next((res for res in resdict_list if res['name'] == res_name), False):
191
                    pass
192
                elif str(res_filename.parent).startswith(moduledir):
193
                    resdict = {
194
                        'name': res_name, 
195
                        'url': str(res_filename), 
196
                        'description': 'Found at module', 
197
                        'type': res_type,
198
                        'size': res_size
199
                    }
200
                else:
201
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
202
                # resdict['path'] = str(res_filename)
203
                reslist.append(resdict)
204
            ret.append((this_executable, reslist))
205
        self.save_user_list()
206
        return ret
207
208
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
209
        """
210
        Add a stub entry to the user resource.yml
211
        """
212
        res_name = res_filename.name
213
        if Path(res_filename).is_dir():
214
            res_size = directory_size(res_filename)
215
        else:
216
            res_size = Path(res_filename).stat().st_size
217
        user_database = self.load_resource_list(self.user_list)
218
        if executable not in user_database:
219
            user_database[executable] = []
220
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
221
        if not resources_found:
222
            self.log.info(f"{executable} resource '{res_name}' ({str(res_filename)}) not a known resource, "
223
                          f"creating stub in {self.user_list}'")
224
            resdict = {
225
                'name': res_name,
226
                'url': url if url else '???',
227
                'description': f'Found at {self.resource_dir_to_location(res_filename)} on {datetime.now()}',
228
                'version_range': '???',
229
                'type': resource_type,
230
                'size': res_size
231
            }
232
            user_database[executable].append(resdict)
233
        else:
234
            resdict = resources_found[0]
235
        self.save_user_list(user_database)
236
        self.load_resource_list(self.user_list)
237
        return resdict
238
239
    @property
240
    def default_resource_dir(self):
241
        return self.location_to_resource_dir('data')
242
243
    def location_to_resource_dir(self, location: str) -> str:
244
        if location == 'data':
245
            return join(self.xdg_data_home, 'ocrd-resources')
246
        if location == 'system':
247
            return RESOURCES_DIR_SYSTEM
248
        return getcwd()
249
250
    def resource_dir_to_location(self, resource_path: Path) -> str:
251
        resource_path = str(resource_path)
252
        if resource_path.startswith(RESOURCES_DIR_SYSTEM):
253
            return 'system'
254
        if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')):
255
            return 'data'
256
        if resource_path.startswith(getcwd()):
257
            return 'cwd'
258
        return resource_path
259
260
    def build_resource_dest_dir(self, location: str, executable: str) -> Path:
261
        if location == 'module':
262
            base_dir = get_moduledir(executable)
263
            if not base_dir:
264
                base_dir = self.location_to_resource_dir('data')
265
        else:
266
            base_dir = self.location_to_resource_dir(location)
267
        no_subdir = location in ['cwd', 'module']
268
        dest_dir = Path(base_dir) if no_subdir else Path(base_dir, executable)
269
        return dest_dir
270
271
    @staticmethod
272
    def remove_resource(log: Logger, resource_path: Path):
273
        if resource_path.is_dir():
274
            log.info(f"Removing existing target resource directory {resource_path}")
275
            rmtree(str(resource_path))
276
        else:
277
            log.info(f"Removing existing target resource file {resource_path}")
278
            unlink(str(resource_path))
279
280
    @staticmethod
281
    def parameter_usage(name: str, usage: str = 'as-is') -> str:
282
        if usage == 'as-is':
283
            return name
284
        elif usage == 'without-extension':
285
            return Path(name).stem
286
        raise ValueError(f"No such usage '{usage}'")
287
288
    @staticmethod
289
    def _download_impl(log: Logger, url: str, filename):
290
        log.info(f"Downloading {url} to {filename}")
291
        try:
292
            gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
293
            if gdrive_file_id:
294
                if not is_gdrive_download_link:
295
                    url = f"https://drive.google.com/uc?id={gdrive_file_id}"
296
                try:
297
                    with requests.get(url, stream=True) as r:
298
                        if "Content-Disposition" not in r.headers:
299
                            url = get_url_from_gdrive_confirmation(r.text)
300
                except RuntimeError as e:
301
                    log.warning(f"Cannot unwrap Google Drive URL: {e}")
302
            with open(filename, 'wb') as f:
303
                with requests.get(url, stream=True) as r:
304
                    r.raise_for_status()
305
                    for data in r.iter_content(chunk_size=4096):
306
                        f.write(data)
307
        except Exception as e:
308
            rmtree(filename, ignore_errors=True)
309
            Path(filename).unlink(missing_ok=True)
310
            raise e
311
312
    @staticmethod
313
    def _copy_file(log: Logger, src, dst):
314
        log.info(f"Copying file {src} to {dst}")
315
        with open(dst, 'wb') as f_out, open(src, 'rb') as f_in:
316
            while True:
317
                chunk = f_in.read(4096)
318
                if chunk:
319
                    f_out.write(chunk)
320
                else:
321
                    break
322
323
    @staticmethod
324
    def _copy_dir(log: Logger, src, dst):
325
        log.info(f"Copying dir recursively from {src} to {dst}")
326
        if not Path(src).is_dir():
327
            raise ValueError(f"The source is not a directory: {src}")
328
        Path(dst).mkdir(parents=True, exist_ok=True)
329
        for child in Path(src).rglob('*'):
330
            child_dst = Path(dst) / child.relative_to(src)
331
            if Path(child).is_dir():
332
                OcrdResourceManager._copy_dir(log, child, child_dst)
333
            else:
334
                OcrdResourceManager._copy_file(log, child, child_dst)
335
336
    @staticmethod
337
    def _copy_impl(log: Logger, src_filename, filename):
338
        log.info(f"Copying {src_filename} to {filename}")
339
        if Path(src_filename).is_dir():
340
            OcrdResourceManager._copy_dir(log, src_filename, filename)
341
        else:
342
            OcrdResourceManager._copy_file(log, src_filename, filename)
343
344
    @staticmethod
345
    def _extract_archive(log: Logger, tempdir: Path, path_in_archive: str, fpath: Path, archive_fname: str):
346
        Path('out').mkdir()
347
        with pushd_popd('out'):
348
            mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
349
            log.info(f"Extracting {mimetype} archive to {tempdir}/out")
350
            if mimetype == 'application/zip':
351
                with ZipFile(f'../{archive_fname}', 'r') as zipf:
352
                    zipf.extractall()
353
            elif mimetype in ('application/gzip', 'application/x-xz'):
354
                with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
355
                    tar.extractall()
356
            else:
357
                raise RuntimeError(f"Unable to handle extraction of {mimetype} archive")
358
            log.info(f"Copying '{path_in_archive}' from archive to {fpath}")
359
            if Path(path_in_archive).is_dir():
360
                copytree(path_in_archive, str(fpath))
361
            else:
362
                copy(path_in_archive, str(fpath))
363
364
    def copy_resource(
365
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
366
    ) -> Path:
367
        """
368
        Copy a local resource to another destination
369
        """
370
        if resource_type == 'archive':
371
            archive_fname = 'download.tar.xx'
372
            with pushd_popd(tempdir=True) as tempdir:
373
                self._copy_impl(log, url, archive_fname)
374
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
375
        else:
376
            self._copy_impl(log, url, fpath)
377
        return fpath
378
379
    def download_resource(
380
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
381
    ) -> Path:
382
        """
383
        Download a resource by URL to a destination directory
384
        """
385
        if resource_type == 'archive':
386
            archive_fname = 'download.tar.xx'
387
            with pushd_popd(tempdir=True) as tempdir:
388
                self._download_impl(log, url, archive_fname)
389
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
390
        else:
391
            self._download_impl(log, url, fpath)
392
        return fpath
393
394
    # TODO Proper caching (make head request for size, If-Modified etc)
395
    def handle_resource(
396
        self, res_dict: Dict, executable: str, dest_dir: Path, any_url: str, overwrite: bool = False,
397
        resource_type: str = 'file', path_in_archive: str = '.'
398
    ) -> Optional[Path]:
399
        """
400
        Download or Copy a resource by URL to a destination directory
401
        """
402
        log = getLogger('ocrd.resource_manager.handle_resource')
403
        registered = "registered" if "size" in res_dict else "unregistered"
404
        resource_type = res_dict.get('type', resource_type)
405
        resource_name = res_dict.get('name', None)
406
        path_in_archive = res_dict.get('path_in_archive', path_in_archive)
407
408
        if resource_type not in RESOURCE_TYPES:
409
            raise ValueError(f"Unknown resource type: {resource_type}, must be one of: {RESOURCE_TYPES}")
410
        if any_url:
411
            res_dict['url'] = any_url
412
        if not resource_name:
413
            url_parsed = urlparse(res_dict['url'])
414
            resource_name = Path(unquote(url_parsed.path)).name
415
            if resource_type == 'archive' and path_in_archive != '.':
416
                resource_name = Path(path_in_archive).name
417
        if res_dict['url'] == '???':
418
            log.warning(f"Skipping user resource {resource_name} since download url is: {res_dict['url']}")
419
            return None
420
421
        fpath = Path(dest_dir, resource_name)
422
        if fpath.exists():
423
            if not overwrite:
424
                fpath_type = 'Directory' if fpath.is_dir() else 'File'
425
                log.warning(f"{fpath_type} {fpath} already exists but --overwrite is not set, skipping the download")
426
                # raise FileExistsError(f"{fpath_type} {fpath} already exists but --overwrite is not set")
427
                return fpath
428
            self.remove_resource(log, resource_path=fpath)
429
        dest_dir.mkdir(parents=True, exist_ok=True)
430
431
        # TODO @mehmedGIT: Consider properly handling cases for invalid URLs.
432
        if res_dict['url'].startswith('https://') or res_dict['url'].startswith('http://'):
433
            log.info(f"Downloading {registered} resource '{resource_name}' ({res_dict['url']})")
434
            if 'size' not in res_dict:
435
                with requests.head(res_dict['url']) as r:
436
                    res_dict['size'] = int(r.headers.get('content-length', 0))
437
            fpath = self.download_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
438
        else:
439
            log.info(f"Copying {registered} resource '{resource_name}' ({res_dict['url']})")
440
            urlpath = Path(res_dict['url'])
441
            res_dict['url'] = str(urlpath.resolve())
442
            res_dict['size'] = directory_size(urlpath) if Path(urlpath).is_dir() else urlpath.stat().st_size
443
            fpath = self.copy_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
444
445
        if registered == 'unregistered':
446
            self.add_to_user_database(executable, fpath, url=res_dict['url'])
447
        self.save_user_list()
448
        log.info(f"Installed resource {res_dict['url']} under {fpath}")
449
        return fpath
450
451
    def _dedup_database(self, database=None, dedup_key='name'):
452
        """
453
        Deduplicate resources by name
454
        """
455
        if not database:
456
            database = self.database
457
        for executable, reslist in database.items():
458
            reslist_dedup = []
459
            for resdict in reslist:
460
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
461
                    reslist_dedup.append(resdict)
462
            database[executable] = reslist_dedup
463
        return database
464