Passed
Pull Request — master (#1309)
by
unknown
03:13
created

OcrdResourceManager.handle_resource()   F

Complexity

Conditions 15

Size

Total Lines 54
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 43
dl 0
loc 54
rs 2.9998
c 0
b 0
f 0
cc 15
nop 8

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.resource_manager.OcrdResourceManager.handle_resource() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from logging import Logger
2
from pathlib import Path
3
from os.path import join
4
from os import environ, listdir, getcwd, unlink
5
from shutil import copytree, rmtree, copy
6
from fnmatch import filter as apply_glob
7
from datetime import datetime
8
from tarfile import open as open_tarfile
9
from typing import Dict, Optional
10
from urllib.parse import urlparse, unquote
11
from zipfile import ZipFile
12
13
import requests
14
from gdown.parse_url import parse_url as gparse_url
15
from gdown.download import get_url_from_gdrive_confirmation
16
from yaml import safe_load, safe_dump
17
18
# pylint: disable=wrong-import-position
19
20
# https://github.com/OCR-D/core/issues/867
21
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
22
import yaml.constructor
23
yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:timestamp'] = \
24
    yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:str']
25
26
# pylint: enable=wrong-import-position
27
28
# pylint: enable=wrong-import-position
29
30
# pylint: enable=wrong-import-position
31
32
from ocrd_validators import OcrdResourceListValidator
33
from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config
34
from ocrd_utils.constants import RESOURCES_DIR_SYSTEM, RESOURCE_TYPES
35
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
36
from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT
37
38
39
class OcrdResourceManager:
40
41
    """
42
    Managing processor resources
43
    """
44
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
45
        self.log = getLogger('ocrd.resource_manager')
46
        self.database = {}
47
48
        self._xdg_data_home = xdg_data_home
49
        self._xdg_config_home = xdg_config_home
50
        self._userdir = userdir
51
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
52
53
        self.log.info(f"OcrdResourceManager data home path: {self.xdg_data_home}")
54
        self.log.info(f"OcrdResourceManager config home path: {self.xdg_config_home}")
55
        self.log.info(f"OcrdResourceManager user list path: {self.user_list}")
56
57
        if not skip_init:
58
            self.load_resource_list(Path(RESOURCE_LIST_FILENAME))
59
            if not self.user_list.exists():
60
                if not self.user_list.parent.exists():
61
                    self.user_list.parent.mkdir(parents=True)
62
                self.save_user_list()
63
            self.load_resource_list(self.user_list)
64
65
    @property
66
    def userdir(self):
67
        if not self._userdir:
68
            self._userdir = config.HOME
69
        return self._userdir
70
71
    @property
72
    def xdg_data_home(self):
73
        if not self._xdg_data_home:
74
            self._xdg_data_home = config.XDG_DATA_HOME
75
        return self._xdg_data_home
76
77
    @property
78
    def xdg_config_home(self):
79
        if not self._xdg_config_home:
80
            self._xdg_config_home = config.XDG_CONFIG_HOME
81
        return self._xdg_config_home
82
83
    def save_user_list(self, database=None):
84
        if not database:
85
            database = self.database
86
        self.log.info(f"Saving resources to path: {self.user_list}")
87
        self._dedup_database()
88
        with open(self.user_list, 'w', encoding='utf-8') as f:
89
            f.write(RESOURCE_USER_LIST_COMMENT)
90
            f.write('\n')
91
            f.write(safe_dump(database))
92
93
    def load_resource_list(self, list_filename: Path, database=None):
94
        self.log.info(f"Loading resources from path: {list_filename}")
95
        if not database:
96
            database = self.database
97
        if list_filename.is_file():
98
            with open(list_filename, 'r', encoding='utf-8') as f:
99
                list_loaded = safe_load(f) or {}
100
            report = OcrdResourceListValidator.validate(list_loaded)
101
            if not report.is_valid:
102
                self.log.error('\n'.join(report.errors))
103
                raise ValueError(f"Resource list {list_filename} is invalid!")
104
            for executable, resource_list in list_loaded.items():
105
                if executable not in database:
106
                    database[executable] = []
107
                # Prepend, so user provided is sorted before builtin
108
                database[executable] = list_loaded[executable] + database[executable]
109
        return database
110
111
    def _search_executables(self, executable: Optional[str]):
112
        skip_executables = ["ocrd-cis-data", "ocrd-import", "ocrd-make"]
113
        for exec_dir in environ['PATH'].split(':'):
114
            self.log.debug(f"Searching for executables inside path: {exec_dir}")
115
            for exec_path in Path(exec_dir).glob(f'{executable}'):
116
                if not exec_path.name.startswith('ocrd-'):
117
                    self.log.warning(f"OCR-D processor executable '{exec_path}' has no 'ocrd-' prefix")
118
                if exec_path.name in skip_executables:
119
                    self.log.debug(f"Not an OCR-D processor CLI, skipping '{exec_path}'")
120
                    continue
121
                self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
122
                ocrd_tool = get_ocrd_tool_json(exec_path)
123
                for res_dict in ocrd_tool.get('resources', ()):
124
                    if exec_path.name not in self.database:
125
                        self.database[exec_path.name] = []
126
                    self.database[exec_path.name].insert(0, res_dict)
127
128
    def list_available(
129
        self, executable: str = None, dynamic: bool = True, name: str = None, database: Dict = None, url: str = None
130
    ):
131
        """
132
        List models available for download by processor
133
        """
134
        if not database:
135
            database = self.database
136
        if not executable:
137
            return database.items()
138
        if dynamic:
139
            self._search_executables(executable)
140
            self.save_user_list()
141
        found = False
142
        ret = []
143
        for k in database:
144
            if apply_glob([k], executable):
145
                found = True
146
                restuple = (k, [])
147
                ret.append(restuple)
148
                for resdict in database[k]:
149
                    if name and resdict['name'] != name:
150
                        continue
151
                    if url and resdict['url'] != url:
152
                        continue
153
                    restuple[1].append(resdict)
154
        if not found:
155
            ret = [(executable, [])]
156
        return ret
157
158
    def list_installed(self, executable: str = None):
159
        """
160
        List installed resources, matching with registry by ``name``
161
        """
162
        ret = []
163
        if executable:
164
            all_executables = [executable]
165
        else:
166
            # resources we know about
167
            all_executables = list(self.database.keys())
168
            # resources in the file system
169
            parent_dirs = [f"{join(self.xdg_data_home, 'ocrd-resources')}", RESOURCES_DIR_SYSTEM]
170
            for parent_dir in parent_dirs:
171
                if Path(parent_dir).exists():
172
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
173
        for this_executable in set(all_executables):
174
            reslist = []
175
            mimetypes = get_processor_resource_types(this_executable)
176
            moduledir = get_moduledir(this_executable)
177
            for res_filename in list_all_resources(this_executable, moduled=moduledir, xdg_data_home=self.xdg_data_home):
178
                res_filename = Path(res_filename)
179
                if not '*/*' in mimetypes:
180
                    if res_filename.is_dir() and not 'text/directory' in mimetypes:
181
                        continue
182
                    if res_filename.is_file() and ['text/directory'] == mimetypes:
183
                        continue
184
                res_name = res_filename.name
185
                res_type = 'file' if res_filename.is_file() else 'directory'
186
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
187
                resdict_list = [x for x in self.database.get(this_executable, []) if x['name'] == res_name]
188
                if resdict_list:
189
                    resdict = resdict_list[0]
190
                elif str(res_filename.parent).startswith(moduledir):
191
                    resdict = {
192
                        'name': res_name, 
193
                        'url': str(res_filename), 
194
                        'description': 'Found at module', 
195
                        'type': res_type,
196
                        'size': res_size
197
                    }
198
                else:
199
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
200
                # resdict['path'] = str(res_filename)
201
                reslist.append(resdict)
202
            ret.append((this_executable, reslist))
203
        self.save_user_list()
204
        return ret
205
206
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
207
        """
208
        Add a stub entry to the user resource.yml
209
        """
210
        res_name = Path(res_filename).name
211
        self.log.info(f"{executable} resource '{res_name}' ({str(res_filename)}) not a known resource, "
212
                      f"creating stub in {self.user_list}'")
213
        if Path(res_filename).is_dir():
214
            res_size = directory_size(res_filename)
215
        else:
216
            res_size = Path(res_filename).stat().st_size
217
        with open(self.user_list, 'r', encoding='utf-8') as f:
218
            user_database = safe_load(f) or {}
219
        if executable not in user_database:
220
            user_database[executable] = []
221
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
222
        if not resources_found:
223
            resdict = {
224
                'name': res_name,
225
                'url': url if url else '???',
226
                'description': f'Found at {self.resource_dir_to_location(res_filename)} on {datetime.now()}',
227
                'version_range': '???',
228
                'type': resource_type,
229
                'size': res_size
230
            }
231
            user_database[executable].append(resdict)
232
        else:
233
            resdict = resources_found[0]
234
        self.save_user_list(user_database)
235
        self.load_resource_list(self.user_list)
236
        return resdict
237
238
    @property
239
    def default_resource_dir(self):
240
        return self.location_to_resource_dir('data')
241
242
    def location_to_resource_dir(self, location: str) -> str:
243
        if location == 'data':
244
            return join(self.xdg_data_home, 'ocrd-resources')
245
        if location == 'system':
246
            return RESOURCES_DIR_SYSTEM
247
        return getcwd()
248
249
    def resource_dir_to_location(self, resource_path: Path) -> str:
250
        resource_path = str(resource_path)
251
        if resource_path.startswith(RESOURCES_DIR_SYSTEM):
252
            return 'system'
253
        if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')):
254
            return 'data'
255
        if resource_path.startswith(getcwd()):
256
            return 'cwd'
257
        return resource_path
258
259
    def build_resource_dest_dir(self, location: str, executable: str) -> Path:
260
        if location == 'module':
261
            base_dir = get_moduledir(executable)
262
            if not base_dir:
263
                base_dir = self.location_to_resource_dir('data')
264
        else:
265
            base_dir = self.location_to_resource_dir(location)
266
        no_subdir = location in ['cwd', 'module']
267
        dest_dir = Path(base_dir) if no_subdir else Path(base_dir, executable)
268
        return dest_dir
269
270
    @staticmethod
271
    def remove_resource(log: Logger, resource_path: Path):
272
        if resource_path.is_dir():
273
            log.info(f"Removing existing target resource directory {resource_path}")
274
            rmtree(str(resource_path))
275
        else:
276
            log.info(f"Removing existing target resource file {resource_path}")
277
            unlink(str(resource_path))
278
279
    @staticmethod
280
    def parameter_usage(name: str, usage: str = 'as-is') -> str:
281
        if usage == 'as-is':
282
            return name
283
        elif usage == 'without-extension':
284
            return Path(name).stem
285
        raise ValueError(f"No such usage '{usage}'")
286
287
    @staticmethod
288
    def _download_impl(log: Logger, url: str, filename):
289
        log.info(f"Downloading {url} to {filename}")
290
        try:
291
            gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
292
            if gdrive_file_id:
293
                if not is_gdrive_download_link:
294
                    url = f"https://drive.google.com/uc?id={gdrive_file_id}"
295
                try:
296
                    with requests.get(url, stream=True) as r:
297
                        if "Content-Disposition" not in r.headers:
298
                            url = get_url_from_gdrive_confirmation(r.text)
299
                except RuntimeError as e:
300
                    log.warning(f"Cannot unwrap Google Drive URL: {e}")
301
            with open(filename, 'wb') as f:
302
                with requests.get(url, stream=True) as r:
303
                    r.raise_for_status()
304
                    for data in r.iter_content(chunk_size=4096):
305
                        f.write(data)
306
        except Exception as e:
307
            rmtree(filename, ignore_errors=True)
308
            Path(filename).unlink(missing_ok=True)
309
            raise e
310
311
    @staticmethod
312
    def _copy_file(log: Logger, src, dst):
313
        log.info(f"Copying file {src} to {dst}")
314
        with open(dst, 'wb') as f_out, open(src, 'rb') as f_in:
315
            while True:
316
                chunk = f_in.read(4096)
317
                if chunk:
318
                    f_out.write(chunk)
319
                else:
320
                    break
321
322
    @staticmethod
323
    def _copy_dir(log: Logger, src, dst):
324
        log.info(f"Copying dir recursively from {src} to {dst}")
325
        if not Path(src).is_dir():
326
            raise ValueError(f"The source is not a directory: {src}")
327
        Path(dst).mkdir(parents=True, exist_ok=True)
328
        for child in Path(src).rglob('*'):
329
            child_dst = Path(dst) / child.relative_to(src)
330
            if Path(child).is_dir():
331
                OcrdResourceManager._copy_dir(log, child, child_dst)
332
            else:
333
                OcrdResourceManager._copy_file(log, child, child_dst)
334
335
    @staticmethod
336
    def _copy_impl(log: Logger, src_filename, filename):
337
        log.info(f"Copying {src_filename} to {filename}")
338
        if Path(src_filename).is_dir():
339
            OcrdResourceManager._copy_dir(log, src_filename, filename)
340
        else:
341
            OcrdResourceManager._copy_file(log, src_filename, filename)
342
343
    @staticmethod
344
    def _extract_archive(log: Logger, tempdir: Path, path_in_archive: str, fpath: Path, archive_fname: str):
345
        Path('out').mkdir()
346
        with pushd_popd('out'):
347
            mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
348
            log.info(f"Extracting {mimetype} archive to {tempdir}/out")
349
            if mimetype == 'application/zip':
350
                with ZipFile(f'../{archive_fname}', 'r') as zipf:
351
                    zipf.extractall()
352
            elif mimetype in ('application/gzip', 'application/x-xz'):
353
                with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
354
                    tar.extractall()
355
            else:
356
                raise RuntimeError(f"Unable to handle extraction of {mimetype} archive")
357
            log.info(f"Copying '{path_in_archive}' from archive to {fpath}")
358
            if Path(path_in_archive).is_dir():
359
                copytree(path_in_archive, str(fpath))
360
            else:
361
                copy(path_in_archive, str(fpath))
362
363
    def copy_resource(
364
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
365
    ) -> Path:
366
        """
367
        Copy a local resource to another destination
368
        """
369
        if resource_type == 'archive':
370
            archive_fname = 'download.tar.xx'
371
            with pushd_popd(tempdir=True) as tempdir:
372
                self._copy_impl(log, url, archive_fname)
373
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
374
        else:
375
            self._copy_impl(log, url, fpath)
376
        return fpath
377
378
    def download_resource(
379
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
380
    ) -> Path:
381
        """
382
        Download a resource by URL to a destination directory
383
        """
384
        if resource_type == 'archive':
385
            archive_fname = 'download.tar.xx'
386
            with pushd_popd(tempdir=True) as tempdir:
387
                self._download_impl(log, url, archive_fname)
388
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
389
        else:
390
            self._download_impl(log, url, fpath)
391
        return fpath
392
393
    # TODO Proper caching (make head request for size, If-Modified etc)
394
    def handle_resource(
395
        self, res_dict: Dict, executable: str, dest_dir: Path, any_url: str, overwrite: bool = False,
396
        resource_type: str = 'file', path_in_archive: str = '.'
397
    ) -> Optional[Path]:
398
        """
399
        Download or Copy a resource by URL to a destination directory
400
        """
401
        log = getLogger('ocrd.resource_manager.handle_resource')
402
        registered = "registered" if "size" in res_dict else "unregistered"
403
        resource_type = res_dict.get('type', resource_type)
404
        resource_name = res_dict.get('name', None)
405
        if resource_type not in RESOURCE_TYPES:
406
            raise ValueError(f"Unknown resource type: {resource_type}, must be one of: {RESOURCE_TYPES}")
407
        if any_url:
408
            res_dict['url'] = any_url
409
        if not resource_name:
410
            url_parsed = urlparse(res_dict['url'])
411
            resource_name = Path(unquote(url_parsed.path)).name
412
        if res_dict['url'] == '???':
413
            log.warning(f"Skipping user resource {resource_name} since download url is: {res_dict['url']}")
414
            return None
415
416
        fpath = Path(dest_dir, resource_name)
417
        if fpath.exists():
418
            if not overwrite:
419
                fpath_type = 'Directory' if fpath.is_dir() else 'File'
420
                log.warning(f"{fpath_type} {fpath} already exists but --overwrite is not set, skipping the download")
421
                # raise FileExistsError(f"{fpath_type} {fpath} already exists but --overwrite is not set")
422
                return fpath
423
            self.remove_resource(log, resource_path=fpath)
424
        dest_dir.mkdir(parents=True, exist_ok=True)
425
        path_in_archive = res_dict.get('path_in_archive', path_in_archive)
426
427
        # TODO @mehmedGIT: Consider properly handling cases for invalid URLs.
428
        if res_dict['url'].startswith('https://') or res_dict['url'].startswith('http://'):
429
            log.info(f"Downloading {registered} resource '{resource_name}' ({res_dict['url']})")
430
            if 'size' not in res_dict:
431
                with requests.head(res_dict['url']) as r:
432
                    res_dict['size'] = int(r.headers.get('content-length', 0))
433
            fpath = self.download_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
434
        else:
435
            log.info(f"Copying {registered} resource '{resource_name}' ({res_dict['url']})")
436
            urlpath = Path(res_dict['url'])
437
            res_dict['url'] = str(urlpath.resolve())
438
            res_dict['size'] = directory_size(urlpath) if Path(urlpath).is_dir() else urlpath.stat().st_size
439
            fpath = self.copy_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
440
441
        if registered == 'unregistered':
442
            log.info(f"{executable} resource '{resource_name}' ({res_dict['url']}) not a known resource, creating stub "
443
                     f"in {self.user_list}'")
444
            self.add_to_user_database(executable, fpath, url=res_dict['url'])
445
        self.save_user_list()
446
        log.info(f"Installed resource {res_dict['url']} under {fpath}")
447
        return fpath
448
449
    def _dedup_database(self, database=None, dedup_key='name'):
450
        """
451
        Deduplicate resources by name
452
        """
453
        if not database:
454
            database = self.database
455
        for executable, reslist in database.items():
456
            reslist_dedup = []
457
            for resdict in reslist:
458
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
459
                    reslist_dedup.append(resdict)
460
            database[executable] = reslist_dedup
461
        return database
462