Passed
Pull Request — main (#762)
by Juho
02:38
created

annif.cli_util   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 309
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 200
dl 0
loc 309
rs 7.44
c 0
b 0
f 0
wmc 52

22 Functions

Rating   Name   Duplication   Size   Complexity  
A format_datetime() 0 5 2
A get_vocab() 0 9 2
A archive_dirs() 0 11 4
A _set_project_config_file_path() 0 7 3
A make_list_template() 0 14 3
A upload_to_hf_hub() 0 15 3
A backend_param_option() 0 9 1
A docs_limit_option() 0 10 1
A _validate_backend_params() 0 5 2
A write_tmp_project_configs_file() 0 6 3
A common_options() 0 12 1
A remove_tmp_files() 0 3 2
A generate_filter_params() 0 4 1
A get_project() 0 8 2
B open_documents() 0 31 5
A _get_completion_choices() 0 9 3
A complete_param() 0 6 2
A is_train_file() 0 6 3
A parse_backend_params() 0 12 2
A open_text_documents() 0 18 4
A project_id() 0 3 1
A show_hits() 0 20 2

How to fix   Complexity   

Complexity

Complex classes like annif.cli_util often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions for Annif CLI commands"""
2
from __future__ import annotations
3
4
import collections
5
import configparser
6
import itertools
7
import os
8
import pathlib
9
import sys
10
import zipfile
11
from typing import TYPE_CHECKING
12
13
import click
14
import click_log
15
from flask import current_app
16
from huggingface_hub import HfApi
17
from huggingface_hub.utils import HfHubHTTPError
18
19
import annif
20
from annif.exception import ConfigurationException, OperationFailedException
21
from annif.project import Access
22
23
if TYPE_CHECKING:
24
    from datetime import datetime
25
    from io import TextIOWrapper
26
27
    from click.core import Argument, Context, Option
28
29
    from annif.corpus.document import DocumentCorpus, DocumentList
30
    from annif.corpus.subject import SubjectIndex
31
    from annif.project import AnnifProject
32
    from annif.suggestion import SuggestionResult
33
    from annif.vocab import AnnifVocabulary
34
35
logger = annif.logger
36
37
38
def _set_project_config_file_path(
39
    ctx: Context, param: Option, value: str | None
40
) -> None:
41
    """Override the default path or the path given in env by CLI option"""
42
    with ctx.obj.load_app().app_context():
43
        if value:
44
            current_app.config["PROJECTS_CONFIG_PATH"] = value
45
46
47
def common_options(f):
48
    """Decorator to add common options for all CLI commands"""
49
    f = click.option(
50
        "-p",
51
        "--projects",
52
        help="Set path to project configuration file or directory",
53
        type=click.Path(dir_okay=True, exists=True),
54
        callback=_set_project_config_file_path,
55
        expose_value=False,
56
        is_eager=True,
57
    )(f)
58
    return click_log.simple_verbosity_option(logger)(f)
59
60
61
def project_id(f):
62
    """Decorator to add a project ID parameter to a CLI command"""
63
    return click.argument("project_id", shell_complete=complete_param)(f)
64
65
66
def backend_param_option(f):
67
    """Decorator to add an option for CLI commands to override BE parameters"""
68
    return click.option(
69
        "--backend-param",
70
        "-b",
71
        multiple=True,
72
        help="Override backend parameter of the config file. "
73
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
74
    )(f)
75
76
77
def docs_limit_option(f):
78
    """Decorator to add an option for CLI commands to limit the number of documents to
79
    use"""
80
    return click.option(
81
        "--docs-limit",
82
        "-d",
83
        default=None,
84
        type=click.IntRange(0, None),
85
        help="Maximum number of documents to use",
86
    )(f)
87
88
89
def get_project(project_id: str) -> AnnifProject:
90
    """
91
    Helper function to get a project by ID and bail out if it doesn't exist"""
92
    try:
93
        return annif.registry.get_project(project_id, min_access=Access.private)
94
    except ValueError:
95
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
96
        sys.exit(1)
97
98
99
def get_vocab(vocab_id: str) -> AnnifVocabulary:
100
    """
101
    Helper function to get a vocabulary by ID and bail out if it doesn't
102
    exist"""
103
    try:
104
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
105
    except ValueError:
106
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
107
        sys.exit(1)
108
109
110
def make_list_template(*rows) -> str:
111
    """Helper function to create a template for a list of entries with fields of
112
    variable width. The width of each field is determined by the longest item in the
113
    field in the given rows."""
114
115
    max_field_widths = collections.defaultdict(int)
116
    for row in rows:
117
        for field_ind, item in enumerate(row):
118
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
119
120
    return "  ".join(
121
        [
122
            f"{{{field_ind}: <{field_width}}}"
123
            for field_ind, field_width in max_field_widths.items()
124
        ]
125
    )
126
127
128
def format_datetime(dt: datetime | None) -> str:
129
    """Helper function to format a datetime object as a string in the local time."""
130
    if dt is None:
131
        return "-"
132
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
133
134
135
def open_documents(
136
    paths: tuple[str, ...],
137
    subject_index: SubjectIndex,
138
    vocab_lang: str,
139
    docs_limit: int | None,
140
) -> DocumentCorpus:
141
    """Helper function to open a document corpus from a list of pathnames,
142
    each of which is either a TSV file or a directory of TXT files. For
143
    directories with subjects in TSV files, the given vocabulary language
144
    will be used to convert subject labels into URIs. The corpus will be
145
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
146
147
    def open_doc_path(path, subject_index):
148
        """open a single path and return it as a DocumentCorpus"""
149
        if os.path.isdir(path):
150
            return annif.corpus.DocumentDirectory(
151
                path, subject_index, vocab_lang, require_subjects=True
152
            )
153
        return annif.corpus.DocumentFile(path, subject_index)
154
155
    if len(paths) == 0:
156
        logger.warning("Reading empty file")
157
        docs = open_doc_path(os.path.devnull, subject_index)
158
    elif len(paths) == 1:
159
        docs = open_doc_path(paths[0], subject_index)
160
    else:
161
        corpora = [open_doc_path(path, subject_index) for path in paths]
162
        docs = annif.corpus.CombinedCorpus(corpora)
163
    if docs_limit is not None:
164
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
165
    return docs
166
167
168
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
169
    """
170
    Helper function to read text documents from the given file paths. Returns a
171
    DocumentList object with Documents having no subjects. If a path is "-", the
172
    document text is read from standard input. The maximum number of documents to read
173
    is set by docs_limit parameter.
174
    """
175
176
    def _docs(paths):
177
        for path in paths:
178
            if path == "-":
179
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
180
            else:
181
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
182
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
183
            yield doc
184
185
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
186
187
188
def show_hits(
189
    hits: SuggestionResult,
190
    project: AnnifProject,
191
    lang: str,
192
    file: TextIOWrapper | None = None,
193
) -> None:
194
    """
195
    Print subject suggestions to the console or a file. The suggestions are displayed as
196
    a table, with one row per hit. Each row contains the URI, label, possible notation,
197
    and score of the suggestion. The label is given in the specified language.
198
    """
199
    template = "<{}>\t{}\t{:.04f}"
200
    for hit in hits:
201
        subj = project.subjects[hit.subject_id]
202
        line = template.format(
203
            subj.uri,
204
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
205
            hit.score,
206
        )
207
        click.echo(line, file=file)
208
209
210
def parse_backend_params(
211
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
212
) -> collections.defaultdict[str, dict[str, str]]:
213
    """Parse a list of backend parameters given with the --backend-param
214
    option into a nested dict structure"""
215
    backend_params = collections.defaultdict(dict)
216
    for beparam in backend_param:
217
        backend, param = beparam.split(".", 1)
218
        key, val = param.split("=", 1)
219
        _validate_backend_params(backend, beparam, project)
220
        backend_params[backend][key] = val
221
    return backend_params
222
223
224
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
225
    if backend != project.config["backend"]:
226
        raise ConfigurationException(
227
            'The backend {} in CLI option "-b {}" not matching the project'
228
            " backend {}.".format(backend, beparam, project.config["backend"])
229
        )
230
231
232
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
233
    limits = range(1, filter_batch_max_limit + 1)
234
    thresholds = [i * 0.05 for i in range(20)]
235
    return list(itertools.product(limits, thresholds))
236
237
238
def is_train_file(fname):
239
    train_file_patterns = ("-train", "tmp-")
240
    for pat in train_file_patterns:
241
        if pat in fname:
242
            return True
243
    return False
244
245
246
TMPF_PREFIX = "tmp-upload-"
247
248
249
def archive_dirs(dirs, zip_fname):
250
    logger.debug(f"Creating archive {zip_fname}")
251
    with zipfile.ZipFile(TMPF_PREFIX + zip_fname, mode="w") as zfile:
252
        for pdir in dirs:
253
            directory = pathlib.Path(pdir)
254
            fpaths = [
255
                fpath for fpath in directory.iterdir() if not is_train_file(fpath.name)
256
            ]
257
            for fpath in fpaths:
258
                logger.debug(f"Adding {fpath}")
259
                zfile.write(fpath, arcname=fpath)
260
261
262
def upload_to_hf_hub(fname, repo_id, token, commit_message):
263
    commit_message = (
264
        commit_message if commit_message is not None else f"Upload {fname} with Annif"
265
    )
266
    api = HfApi()
267
    try:
268
        api.upload_file(
269
            path_or_fileobj=TMPF_PREFIX + fname,
270
            path_in_repo=fname,
271
            repo_id=repo_id,
272
            token=token,
273
            commit_message=commit_message,
274
        )
275
    except HfHubHTTPError as err:
276
        raise OperationFailedException(str(err))
277
278
279
def write_tmp_project_configs_file(projects, projects_conf_fname):
280
    config = configparser.ConfigParser()
281
    for proj in projects:
282
        config[proj.project_id] = proj.config
283
    with open(TMPF_PREFIX + projects_conf_fname, "w") as tmp_projects_file:
284
        config.write(tmp_projects_file)
285
286
287
def remove_tmp_files():
288
    for tmp_file_path in pathlib.Path(".").glob(TMPF_PREFIX + "*"):
289
        tmp_file_path.unlink()
290
291
292
def _get_completion_choices(
293
    param: Argument,
294
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
295
    if param.name == "project_id":
296
        return annif.registry.get_projects()
297
    elif param.name == "vocab_id":
298
        return annif.registry.get_vocabs()
299
    else:
300
        return []
301
302
303
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
304
    with ctx.obj.load_app().app_context():
305
        return [
306
            choice
307
            for choice in _get_completion_choices(param)
308
            if choice.startswith(incomplete)
309
        ]
310