Passed
Push — issue760-hugging-face-hub-inte... ( 9d030c...313511 )
by Juho
02:35
created

annif.cli_util   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 343
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 225
dl 0
loc 343
rs 6.96
c 0
b 0
f 0
wmc 53

26 Functions

Rating   Name   Duplication   Size   Complexity  
A format_datetime() 0 5 2
A get_vocab() 0 9 2
A _set_project_config_file_path() 0 7 3
A unzip() 0 3 2
A write_config() 0 8 1
A make_list_template() 0 14 3
A get_selected_project_ids_from_hf_hub() 0 6 1
A upload_to_hf_hub() 0 12 2
A backend_param_option() 0 9 1
A docs_limit_option() 0 10 1
A get_vocab_id() 0 5 1
A _validate_backend_params() 0 5 2
A download_from_hf_hub() 0 10 2
A common_options() 0 12 1
B open_documents() 0 31 5
A get_project() 0 8 2
A _is_train_file() 0 6 3
A generate_filter_params() 0 4 1
A _get_completion_choices() 0 9 3
A complete_param() 0 6 2
A archive_dir() 0 10 3
A parse_backend_params() 0 12 2
A project_id() 0 3 1
A open_text_documents() 0 18 4
A _list_files_in_hf_hub() 0 4 1
A show_hits() 0 20 2

How to fix   Complexity   

Complexity

Complex classes like annif.cli_util often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import configparser
7
import io
8
import itertools
9
import os
10
import pathlib
11
import sys
12
import tempfile
13
import zipfile
14
from fnmatch import fnmatch
15
from typing import TYPE_CHECKING
16
17
import click
18
import click_log
19
from flask import current_app
20
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
21
from huggingface_hub.utils import HfHubHTTPError, HFValidationError
22
23
import annif
24
from annif.exception import ConfigurationException, OperationFailedException
25
from annif.project import Access
26
27
if TYPE_CHECKING:
28
    from datetime import datetime
29
    from io import TextIOWrapper
30
31
    from click.core import Argument, Context, Option
32
33
    from annif.corpus.document import DocumentCorpus, DocumentList
34
    from annif.corpus.subject import SubjectIndex
35
    from annif.project import AnnifProject
36
    from annif.suggestion import SuggestionResult
37
    from annif.vocab import AnnifVocabulary
38
39
logger = annif.logger
40
41
42
def _set_project_config_file_path(
43
    ctx: Context, param: Option, value: str | None
44
) -> None:
45
    """Override the default path or the path given in env by CLI option"""
46
    with ctx.obj.load_app().app_context():
47
        if value:
48
            current_app.config["PROJECTS_CONFIG_PATH"] = value
49
50
51
def common_options(f):
52
    """Decorator to add common options for all CLI commands"""
53
    f = click.option(
54
        "-p",
55
        "--projects",
56
        help="Set path to project configuration file or directory",
57
        type=click.Path(dir_okay=True, exists=True),
58
        callback=_set_project_config_file_path,
59
        expose_value=False,
60
        is_eager=True,
61
    )(f)
62
    return click_log.simple_verbosity_option(logger)(f)
63
64
65
def project_id(f):
66
    """Decorator to add a project ID parameter to a CLI command"""
67
    return click.argument("project_id", shell_complete=complete_param)(f)
68
69
70
def backend_param_option(f):
71
    """Decorator to add an option for CLI commands to override BE parameters"""
72
    return click.option(
73
        "--backend-param",
74
        "-b",
75
        multiple=True,
76
        help="Override backend parameter of the config file. "
77
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
78
    )(f)
79
80
81
def docs_limit_option(f):
82
    """Decorator to add an option for CLI commands to limit the number of documents to
83
    use"""
84
    return click.option(
85
        "--docs-limit",
86
        "-d",
87
        default=None,
88
        type=click.IntRange(0, None),
89
        help="Maximum number of documents to use",
90
    )(f)
91
92
93
def get_project(project_id: str) -> AnnifProject:
94
    """
95
    Helper function to get a project by ID and bail out if it doesn't exist"""
96
    try:
97
        return annif.registry.get_project(project_id, min_access=Access.private)
98
    except ValueError:
99
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
100
        sys.exit(1)
101
102
103
def get_vocab(vocab_id: str) -> AnnifVocabulary:
104
    """
105
    Helper function to get a vocabulary by ID and bail out if it doesn't
106
    exist"""
107
    try:
108
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
109
    except ValueError:
110
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
111
        sys.exit(1)
112
113
114
def make_list_template(*rows) -> str:
115
    """Helper function to create a template for a list of entries with fields of
116
    variable width. The width of each field is determined by the longest item in the
117
    field in the given rows."""
118
119
    max_field_widths = collections.defaultdict(int)
120
    for row in rows:
121
        for field_ind, item in enumerate(row):
122
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
123
124
    return "  ".join(
125
        [
126
            f"{{{field_ind}: <{field_width}}}"
127
            for field_ind, field_width in max_field_widths.items()
128
        ]
129
    )
130
131
132
def format_datetime(dt: datetime | None) -> str:
133
    """Helper function to format a datetime object as a string in the local time."""
134
    if dt is None:
135
        return "-"
136
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
137
138
139
def open_documents(
140
    paths: tuple[str, ...],
141
    subject_index: SubjectIndex,
142
    vocab_lang: str,
143
    docs_limit: int | None,
144
) -> DocumentCorpus:
145
    """Helper function to open a document corpus from a list of pathnames,
146
    each of which is either a TSV file or a directory of TXT files. For
147
    directories with subjects in TSV files, the given vocabulary language
148
    will be used to convert subject labels into URIs. The corpus will be
149
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
150
151
    def open_doc_path(path, subject_index):
152
        """open a single path and return it as a DocumentCorpus"""
153
        if os.path.isdir(path):
154
            return annif.corpus.DocumentDirectory(
155
                path, subject_index, vocab_lang, require_subjects=True
156
            )
157
        return annif.corpus.DocumentFile(path, subject_index)
158
159
    if len(paths) == 0:
160
        logger.warning("Reading empty file")
161
        docs = open_doc_path(os.path.devnull, subject_index)
162
    elif len(paths) == 1:
163
        docs = open_doc_path(paths[0], subject_index)
164
    else:
165
        corpora = [open_doc_path(path, subject_index) for path in paths]
166
        docs = annif.corpus.CombinedCorpus(corpora)
167
    if docs_limit is not None:
168
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
169
    return docs
170
171
172
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
173
    """
174
    Helper function to read text documents from the given file paths. Returns a
175
    DocumentList object with Documents having no subjects. If a path is "-", the
176
    document text is read from standard input. The maximum number of documents to read
177
    is set by docs_limit parameter.
178
    """
179
180
    def _docs(paths):
181
        for path in paths:
182
            if path == "-":
183
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
184
            else:
185
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
186
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
187
            yield doc
188
189
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
190
191
192
def show_hits(
193
    hits: SuggestionResult,
194
    project: AnnifProject,
195
    lang: str,
196
    file: TextIOWrapper | None = None,
197
) -> None:
198
    """
199
    Print subject suggestions to the console or a file. The suggestions are displayed as
200
    a table, with one row per hit. Each row contains the URI, label, possible notation,
201
    and score of the suggestion. The label is given in the specified language.
202
    """
203
    template = "<{}>\t{}\t{:.04f}"
204
    for hit in hits:
205
        subj = project.subjects[hit.subject_id]
206
        line = template.format(
207
            subj.uri,
208
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
209
            hit.score,
210
        )
211
        click.echo(line, file=file)
212
213
214
def parse_backend_params(
215
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
216
) -> collections.defaultdict[str, dict[str, str]]:
217
    """Parse a list of backend parameters given with the --backend-param
218
    option into a nested dict structure"""
219
    backend_params = collections.defaultdict(dict)
220
    for beparam in backend_param:
221
        backend, param = beparam.split(".", 1)
222
        key, val = param.split("=", 1)
223
        _validate_backend_params(backend, beparam, project)
224
        backend_params[backend][key] = val
225
    return backend_params
226
227
228
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
229
    if backend != project.config["backend"]:
230
        raise ConfigurationException(
231
            'The backend {} in CLI option "-b {}" not matching the project'
232
            " backend {}.".format(backend, beparam, project.config["backend"])
233
        )
234
235
236
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
237
    limits = range(1, filter_batch_max_limit + 1)
238
    thresholds = [i * 0.05 for i in range(20)]
239
    return list(itertools.product(limits, thresholds))
240
241
242
def _is_train_file(fname):
243
    train_file_patterns = ("-train", "tmp-")
244
    for pat in train_file_patterns:
245
        if pat in fname:
246
            return True
247
    return False
248
249
250
def archive_dir(data_dir):
251
    fp = tempfile.TemporaryFile()
252
    path = pathlib.Path(data_dir)
253
    fpaths = [fpath for fpath in path.glob("**/*") if not _is_train_file(fpath.name)]
254
    with zipfile.ZipFile(fp, mode="w") as zfile:
255
        for fpath in fpaths:
256
            logger.debug(f"Adding {fpath}")
257
            zfile.write(fpath)
258
    fp.seek(0)
259
    return fp
260
261
262
def write_config(project):
263
    fp = tempfile.TemporaryFile(mode="w+t")
264
    config = configparser.ConfigParser()
265
    config[project.project_id] = project.config
266
    config.write(fp)  # This needs tempfile in text mode
267
    fp.seek(0)
268
    # But for upload fobj needs to be in binary mode
269
    return io.BytesIO(fp.read().encode("utf8"))
270
271
272
def upload_to_hf_hub(fileobj, filename, repo_id, token, commit_message):
273
    api = HfApi()
274
    try:
275
        api.upload_file(
276
            path_or_fileobj=fileobj,
277
            path_in_repo=filename,
278
            repo_id=repo_id,
279
            token=token,
280
            commit_message=commit_message,
281
        )
282
    except (HfHubHTTPError, HFValidationError) as err:
283
        raise OperationFailedException(str(err))
284
285
286
def get_selected_project_ids_from_hf_hub(project_ids_pattern, repo_id, token, revision):
287
    all_repo_file_paths = _list_files_in_hf_hub(repo_id, token, revision)
288
    return [
289
        path.rsplit(".zip")[0].split("projects/")[1]  # TODO Try-catch this
290
        for path in all_repo_file_paths
291
        if fnmatch(path, f"projects/{project_ids_pattern}.zip")
292
    ]
293
294
295
def _list_files_in_hf_hub(repo_id, token, revision):
296
    return [
297
        repofile
298
        for repofile in list_repo_files(repo_id=repo_id, token=token, revision=revision)
299
    ]
300
301
302
def download_from_hf_hub(filename, repo_id, token, revision):
303
    try:
304
        return hf_hub_download(
305
            repo_id=repo_id,
306
            filename=filename,
307
            token=token,
308
            revision=revision,
309
        )
310
    except (HfHubHTTPError, HFValidationError) as err:
311
        raise OperationFailedException(str(err))
312
313
314
def unzip(source_path):
315
    with zipfile.ZipFile(source_path, "r") as zfile:
316
        zfile.extractall()  # TODO Disallow overwrite
317
318
319
def get_vocab_id(config_path):
320
    config = configparser.ConfigParser()
321
    config.read(config_path)
322
    section = config.sections()[0]
323
    return config[section]["vocab"]
324
325
326
def _get_completion_choices(
327
    param: Argument,
328
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
329
    if param.name == "project_id":
330
        return annif.registry.get_projects()
331
    elif param.name == "vocab_id":
332
        return annif.registry.get_vocabs()
333
    else:
334
        return []
335
336
337
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
338
    with ctx.obj.load_app().app_context():
339
        return [
340
            choice
341
            for choice in _get_completion_choices(param)
342
            if choice.startswith(incomplete)
343
        ]
344