Passed
Pull Request — master (#1340)
by Konstantin
02:19
created

ocrd.resource_manager   F

Complexity

Total Complexity 115

Size/Duplication

Total Lines 381
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 115
eloc 307
dl 0
loc 381
rs 2
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like ocrd.resource_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from pathlib import Path
3
from os.path import join
4
from os import environ, listdir, getcwd, unlink
5
from shutil import copytree, rmtree, copy
6
from fnmatch import filter as apply_glob
7
from datetime import datetime
8
from tarfile import open as open_tarfile
9
from typing import Dict, Optional
10
from urllib.parse import urlparse, unquote
11
from zipfile import ZipFile
12
13
import requests
14
from gdown.parse_url import parse_url as gparse_url
15
from gdown.download import get_url_from_gdrive_confirmation
16
from git import Repo
17
from yaml import safe_load, safe_dump
18
19
# pylint: disable=wrong-import-position
20
21
# https://github.com/OCR-D/core/issues/867
22
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
23
import yaml.constructor
24
yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:timestamp'] = \
25
    yaml.constructor.SafeConstructor.yaml_constructors['tag:yaml.org,2002:str']
26
27
# pylint: enable=wrong-import-position
28
29
# pylint: enable=wrong-import-position
30
31
# pylint: enable=wrong-import-position
32
33
from ocrd_validators import OcrdResourceListValidator
34
from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config
35
from ocrd_utils.constants import RESOURCES_DIR_SYSTEM, RESOURCE_TYPES, MIME_TO_EXT
36
from ocrd_utils.os import get_processor_resource_types, is_git_url, list_all_resources, pushd_popd, get_ocrd_tool_json
37
from .constants import RESOURCE_USER_LIST_COMMENT
38
39
40
class OcrdResourceManager:
41
42
    """
43
    Managing processor resources
44
    """
45
    def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
46
        self.log = getLogger('ocrd.resource_manager')
47
        self.database = {}
48
49
        self._xdg_data_home = xdg_data_home
50
        self._xdg_config_home = xdg_config_home
51
        self._userdir = userdir
52
        self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
53
54
        self.log.info(f"OcrdResourceManager data home path: {self.xdg_data_home}")
55
        self.log.info(f"OcrdResourceManager config home path: {self.xdg_config_home}")
56
        self.log.info(f"OcrdResourceManager user list path: {self.user_list}")
57
58
        if not skip_init:
59
            if not self.user_list.exists():
60
                if not self.user_list.parent.exists():
61
                    self.user_list.parent.mkdir(parents=True)
62
                self.save_user_list()
63
            self.load_resource_list(self.user_list)
64
65
    def __repr__(self):
66
        return f"user_list={str(self.user_list)} " + \
67
               f"exists={self.user_list.exists()} " + \
68
               f"database: {len(self.database)} executables " + \
69
               f"{sum(map(len, self.database.values()))} resources"
70
71
    @property
72
    def userdir(self):
73
        if not self._userdir:
74
            self._userdir = config.HOME
75
        return self._userdir
76
77
    @property
78
    def xdg_data_home(self):
79
        if not self._xdg_data_home:
80
            self._xdg_data_home = config.XDG_DATA_HOME
81
        return self._xdg_data_home
82
83
    @property
84
    def xdg_config_home(self):
85
        if not self._xdg_config_home:
86
            self._xdg_config_home = config.XDG_CONFIG_HOME
87
        return self._xdg_config_home
88
89
    def save_user_list(self, database=None):
90
        if not database:
91
            database = self.database
92
        self.log.info(f"Saving resources to path: {self.user_list}")
93
        self._dedup_database()
94
        with open(self.user_list, 'w', encoding='utf-8') as f:
95
            f.write(RESOURCE_USER_LIST_COMMENT)
96
            f.write('\n')
97
            f.write(safe_dump(database))
98
99
    def load_resource_list(self, list_filename: Path, database=None):
100
        self.log.info(f"Loading resources from path: {list_filename}")
101
        if not database:
102
            database = self.database
103
        if list_filename.is_file():
104
            with open(list_filename, 'r', encoding='utf-8') as f:
105
                list_loaded = safe_load(f) or {}
106
            report = OcrdResourceListValidator.validate(list_loaded)
107
            if not report.is_valid:
108
                self.log.error('\n'.join(report.errors))
109
                raise ValueError(f"Resource list {list_filename} is invalid!")
110
            for executable, resource_list in list_loaded.items():
111
                if executable not in database:
112
                    database[executable] = []
113
                # Prepend, so user provided is sorted before builtin
114
                database[executable] = list_loaded[executable] + database[executable]
115
        return database
116
117
    def _search_executables(self, executable: Optional[str]):
118
        skip_executables = ["ocrd-cis-data", "ocrd-import", "ocrd-make"]
119
        for exec_dir in environ['PATH'].split(':'):
120
            self.log.debug(f"Searching for executables inside path: {exec_dir}")
121
            for exec_path in Path(exec_dir).glob(f'{executable}'):
122
                if not exec_path.name.startswith('ocrd-'):
123
                    self.log.warning(f"OCR-D processor executable '{exec_path}' has no 'ocrd-' prefix")
124
                if exec_path.name in skip_executables:
125
                    self.log.debug(f"Not an OCR-D processor CLI, skipping '{exec_path}'")
126
                    continue
127
                self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
128
                ocrd_tool = get_ocrd_tool_json(exec_path)
129
                for res_dict in ocrd_tool.get('resources', ()):
130
                    if exec_path.name not in self.database:
131
                        self.database[exec_path.name] = []
132
                    self.database[exec_path.name].insert(0, res_dict)
133
134
    def list_available(
135
        self, executable: str = None, dynamic: bool = True, name: str = None, database: Dict = None, url: str = None
136
    ):
137
        """
138
        List models available for download by processor
139
        """
140
        if not database:
141
            database = self.database
142
        if not executable:
143
            return list(database.items())
144
        if dynamic:
145
            self._search_executables(executable)
146
            self.save_user_list()
147
        found = False
148
        ret = []
149
        for k in database:
150
            if apply_glob([k], executable):
151
                found = True
152
                restuple = (k, [])
153
                ret.append(restuple)
154
                for resdict in database[k]:
155
                    if name and resdict['name'] != name:
156
                        continue
157
                    if url and resdict['url'] != url:
158
                        continue
159
                    restuple[1].append(resdict)
160
        if not found:
161
            ret = [(executable, [])]
162
        return ret
163
164
    def list_installed(self, executable: str = None):
165
        """
166
        List installed resources, matching with registry by ``name``
167
        """
168
        ret = []
169
        if executable:
170
            all_executables = [executable]
171
        else:
172
            # resources we know about
173
            all_executables = list(self.database.keys())
174
            # resources in the file system
175
            parent_dirs = [f"{join(self.xdg_data_home, 'ocrd-resources')}", RESOURCES_DIR_SYSTEM]
176
            for parent_dir in parent_dirs:
177
                if Path(parent_dir).exists():
178
                    all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
179
        for this_executable in set(all_executables):
180
            reslist = []
181
            moduledir = get_moduledir(this_executable)
182
            resdict_list = self.list_available(executable=this_executable)[0][1]
183
            for res_filename in list_all_resources(this_executable,
184
                                                   moduled=moduledir,
185
                                                   xdg_data_home=self.xdg_data_home):
186
                res_filename = Path(res_filename).resolve()
187
                res_name = res_filename.name
188
                res_type = 'file' if res_filename.is_file() else 'directory'
189
                res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
190
                if resdict := next((res for res in resdict_list if res['name'] == res_name), False):
191
                    pass
192
                elif str(res_filename.parent).startswith(moduledir):
193
                    resdict = {
194
                        'name': res_name, 
195
                        'url': str(res_filename), 
196
                        'description': 'Found at module', 
197
                        'type': res_type,
198
                        'size': res_size
199
                    }
200
                else:
201
                    resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
202
                # resdict['path'] = str(res_filename)
203
                reslist.append(resdict)
204
            ret.append((this_executable, reslist))
205
        self.save_user_list()
206
        return ret
207
208
    def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
209
        """
210
        Add a stub entry to the user resource.yml
211
        """
212
        res_name = res_filename.name
213
        if Path(res_filename).is_dir():
214
            res_size = directory_size(res_filename)
215
        else:
216
            res_size = Path(res_filename).stat().st_size
217
        user_database = self.load_resource_list(self.user_list)
218
        if executable not in user_database:
219
            user_database[executable] = []
220
        resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
221
        if not resources_found:
222
            self.log.info(f"{executable} resource '{res_name}' ({str(res_filename)}) not a known resource, "
223
                          f"creating stub in {self.user_list}'")
224
            resdict = {
225
                'name': res_name,
226
                'url': url if url else '???',
227
                'description': f'Found at {self.resource_dir_to_location(res_filename)} on {datetime.now()}',
228
                'version_range': '???',
229
                'type': resource_type,
230
                'size': res_size
231
            }
232
            user_database[executable].append(resdict)
233
        else:
234
            resdict = resources_found[0]
235
        self.save_user_list(user_database)
236
        self.load_resource_list(self.user_list)
237
        return resdict
238
239
    @property
240
    def default_resource_dir(self):
241
        return self.location_to_resource_dir('data')
242
243
    def location_to_resource_dir(self, location: str) -> str:
244
        if location == 'data':
245
            return join(self.xdg_data_home, 'ocrd-resources')
246
        if location == 'system':
247
            return RESOURCES_DIR_SYSTEM
248
        return getcwd()
249
250
    def resource_dir_to_location(self, resource_path: Path) -> str:
251
        resource_path = str(resource_path)
252
        if resource_path.startswith(RESOURCES_DIR_SYSTEM):
253
            return 'system'
254
        if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')):
255
            return 'data'
256
        if resource_path.startswith(getcwd()):
257
            return 'cwd'
258
        return resource_path
259
260
    def build_resource_dest_dir(self, location: str, executable: str) -> Path:
261
        if location == 'module':
262
            base_dir = get_moduledir(executable)
263
            if not base_dir:
264
                base_dir = self.location_to_resource_dir('data')
265
        else:
266
            base_dir = self.location_to_resource_dir(location)
267
        no_subdir = location in ['cwd', 'module']
268
        dest_dir = Path(base_dir) if no_subdir else Path(base_dir, executable)
269
        return dest_dir
270
271
    @staticmethod
272
    def remove_resource(log: Logger, resource_path: Path):
273
        if resource_path.is_dir():
274
            log.info(f"Removing existing target resource directory {resource_path}")
275
            rmtree(str(resource_path))
276
        else:
277
            log.info(f"Removing existing target resource file {resource_path}")
278
            unlink(str(resource_path))
279
280
    @staticmethod
281
    def parameter_usage(name: str, usage: str = 'as-is') -> str:
282
        if usage == 'as-is':
283
            return name
284
        elif usage == 'without-extension':
285
            return Path(name).stem
286
        raise ValueError(f"No such usage '{usage}'")
287
288
    @staticmethod
289
    def _download_impl(log: Logger, url: str, filename):
290
        log.info(f"Downloading {url} to {filename}")
291
        try:
292
            gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
293
            if gdrive_file_id:
294
                if not is_gdrive_download_link:
295
                    url = f"https://drive.google.com/uc?id={gdrive_file_id}"
296
                try:
297
                    with requests.get(url, stream=True) as r:
298
                        if "Content-Disposition" not in r.headers:
299
                            url = get_url_from_gdrive_confirmation(r.text)
300
                except RuntimeError as e:
301
                    log.warning(f"Cannot unwrap Google Drive URL: {e}")
302
            if is_git_url(url):
303
                log.info("Cloning a git repository")
304
                repo = Repo.clone_from(url, filename, depth=1)
305
                # keep only the checkout
306
                rmtree(join(filename, '.git'))
307
            else:
308
                with open(filename, 'wb') as f:
309
                    with requests.get(url, stream=True) as r:
310
                        r.raise_for_status()
311
                        for data in r.iter_content(chunk_size=4096):
312
                            f.write(data)
313
        except Exception as e:
314
            rmtree(filename, ignore_errors=True)
315
            Path(filename).unlink(missing_ok=True)
316
            raise e
317
318
    @staticmethod
319
    def _copy_file(log: Logger, src, dst):
320
        log.info(f"Copying file {src} to {dst}")
321
        with open(dst, 'wb') as f_out, open(src, 'rb') as f_in:
322
            while True:
323
                chunk = f_in.read(4096)
324
                if chunk:
325
                    f_out.write(chunk)
326
                else:
327
                    break
328
329
    @staticmethod
330
    def _copy_dir(log: Logger, src, dst):
331
        log.info(f"Copying dir recursively from {src} to {dst}")
332
        if not Path(src).is_dir():
333
            raise ValueError(f"The source is not a directory: {src}")
334
        Path(dst).mkdir(parents=True, exist_ok=True)
335
        for child in Path(src).rglob('*'):
336
            child_dst = Path(dst) / child.relative_to(src)
337
            if Path(child).is_dir():
338
                OcrdResourceManager._copy_dir(log, child, child_dst)
339
            else:
340
                OcrdResourceManager._copy_file(log, child, child_dst)
341
342
    @staticmethod
343
    def _copy_impl(log: Logger, src_filename, filename):
344
        log.info(f"Copying {src_filename} to {filename}")
345
        if Path(src_filename).is_dir():
346
            OcrdResourceManager._copy_dir(log, src_filename, filename)
347
        else:
348
            OcrdResourceManager._copy_file(log, src_filename, filename)
349
350
    @staticmethod
351
    def _extract_archive(log: Logger, tempdir: Path, path_in_archive: str, fpath: Path, archive_fname: str):
352
        Path('out').mkdir()
353
        with pushd_popd('out'):
354
            mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
355
            log.info(f"Extracting {mimetype} archive to {tempdir}/out")
356
            if mimetype == 'application/zip':
357
                with ZipFile(f'../{archive_fname}', 'r') as zipf:
358
                    zipf.extractall()
359
            elif mimetype in ('application/gzip', 'application/x-xz'):
360
                with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
361
                    tar.extractall()
362
            else:
363
                raise RuntimeError(f"Unable to handle extraction of {mimetype} archive")
364
            log.info(f"Copying '{path_in_archive}' from archive to {fpath}")
365
            if Path(path_in_archive).is_dir():
366
                copytree(path_in_archive, str(fpath))
367
            else:
368
                copy(path_in_archive, str(fpath))
369
370
    def copy_resource(
371
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
372
    ) -> Path:
373
        """
374
        Copy a local resource to another destination
375
        """
376
        if resource_type == 'archive':
377
            archive_fname = 'download.tar.xx'
378
            with pushd_popd(tempdir=True) as tempdir:
379
                self._copy_impl(log, url, archive_fname)
380
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
381
        else:
382
            self._copy_impl(log, url, fpath)
383
        return fpath
384
385
    def download_resource(
386
        self, log: Logger, url: str, fpath: Path, resource_type: str = 'file', path_in_archive: str = '.'
387
    ) -> Path:
388
        """
389
        Download a resource by URL to a destination directory
390
        """
391
        if resource_type == 'archive':
392
            archive_fname = 'download.tar.xx'
393
            with pushd_popd(tempdir=True) as tempdir:
394
                self._download_impl(log, url, archive_fname)
395
                self._extract_archive(log, tempdir, path_in_archive, fpath, archive_fname)
396
        else:
397
            self._download_impl(log, url, fpath)
398
        return fpath
399
400
    # TODO Proper caching (make head request for size, If-Modified etc)
401
    def handle_resource(
402
        self, res_dict: Dict, executable: str, dest_dir: Path, any_url: str, overwrite: bool = False,
403
        resource_type: str = 'file', path_in_archive: str = '.'
404
    ) -> Optional[Path]:
405
        """
406
        Download or Copy a resource by URL to a destination directory
407
        """
408
        log = getLogger('ocrd.resource_manager.handle_resource')
409
        registered = "registered" if "size" in res_dict else "unregistered"
410
        resource_type = res_dict.get('type', resource_type)
411
        resource_name = res_dict.get('name', None)
412
        path_in_archive = res_dict.get('path_in_archive', path_in_archive)
413
414
        if resource_type not in RESOURCE_TYPES:
415
            raise ValueError(f"Unknown resource type: {resource_type}, must be one of: {RESOURCE_TYPES}")
416
        if any_url:
417
            res_dict['url'] = any_url
418
        if not resource_name:
419
            url_parsed = urlparse(res_dict['url'])
420
            resource_name = Path(unquote(url_parsed.path)).name
421
            if resource_type == 'archive' and path_in_archive != '.':
422
                resource_name = Path(path_in_archive).name
423
        if res_dict['url'] == '???':
424
            log.warning(f"Skipping user resource {resource_name} since download url is: {res_dict['url']}")
425
            return None
426
427
        fpath = Path(dest_dir, resource_name)
428
        if fpath.exists():
429
            if not overwrite:
430
                fpath_type = 'Directory' if fpath.is_dir() else 'File'
431
                log.warning(f"{fpath_type} {fpath} already exists but --overwrite is not set, skipping the download")
432
                # raise FileExistsError(f"{fpath_type} {fpath} already exists but --overwrite is not set")
433
                return fpath
434
            self.remove_resource(log, resource_path=fpath)
435
        dest_dir.mkdir(parents=True, exist_ok=True)
436
437
        # TODO @mehmedGIT: Consider properly handling cases for invalid URLs.
438
        if res_dict['url'].startswith('https://') or res_dict['url'].startswith('http://'):
439
            log.info(f"Downloading {registered} resource '{resource_name}' ({res_dict['url']})")
440
            if 'size' not in res_dict:
441
                with requests.head(res_dict['url']) as r:
442
                    res_dict['size'] = int(r.headers.get('content-length', 0))
443
            fpath = self.download_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
444
        else:
445
            log.info(f"Copying {registered} resource '{resource_name}' ({res_dict['url']})")
446
            urlpath = Path(res_dict['url'])
447
            res_dict['url'] = str(urlpath.resolve())
448
            res_dict['size'] = directory_size(urlpath) if Path(urlpath).is_dir() else urlpath.stat().st_size
449
            fpath = self.copy_resource(log, res_dict['url'], fpath, resource_type, path_in_archive)
450
451
        if registered == 'unregistered':
452
            self.add_to_user_database(executable, fpath, url=res_dict['url'])
453
        self.save_user_list()
454
        log.info(f"Installed resource {res_dict['url']} under {fpath}")
455
        return fpath
456
457
    def _dedup_database(self, database=None, dedup_key='name'):
458
        """
459
        Deduplicate resources by name
460
        """
461
        if not database:
462
            database = self.database
463
        for executable, reslist in database.items():
464
            reslist_dedup = []
465
            for resdict in reslist:
466
                if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
467
                    reslist_dedup.append(resdict)
468
            database[executable] = reslist_dedup
469
        return database
470