annif.cli_util.parse_backend_params()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 12
rs 9.95
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import itertools
7
import os
8
import sys
9
from typing import TYPE_CHECKING
10
11
import click
12
import click_log
13
from flask import current_app
14
15
import annif
16
from annif.exception import ConfigurationException
17
from annif.project import Access
18
19
if TYPE_CHECKING:
20
    import io
21
    from datetime import datetime
22
23
    from click.core import Argument, Context, Option
24
25
    from annif.corpus.document import DocumentCorpus, DocumentList
26
    from annif.corpus.subject import SubjectIndex
27
    from annif.project import AnnifProject
28
    from annif.suggestion import SuggestionResult
29
    from annif.vocab import AnnifVocabulary
30
31
logger = annif.logger
32
33
34
def _set_project_config_file_path(
35
    ctx: Context, param: Option, value: str | None
36
) -> None:
37
    """Override the default path or the path given in env by CLI option"""
38
    with ctx.obj.load_app().app_context():
39
        if value:
40
            current_app.config["PROJECTS_CONFIG_PATH"] = value
41
42
43
def common_options(f):
44
    """Decorator to add common options for all CLI commands"""
45
    f = click.option(
46
        "-p",
47
        "--projects",
48
        help="Set path to project configuration file or directory",
49
        type=click.Path(dir_okay=True, exists=True),
50
        callback=_set_project_config_file_path,
51
        expose_value=False,
52
        is_eager=True,
53
    )(f)
54
    return click_log.simple_verbosity_option(logger)(f)
55
56
57
def project_id(f):
58
    """Decorator to add a project ID parameter to a CLI command"""
59
    return click.argument("project_id", shell_complete=complete_param)(f)
60
61
62
def backend_param_option(f):
63
    """Decorator to add an option for CLI commands to override BE parameters"""
64
    return click.option(
65
        "--backend-param",
66
        "-b",
67
        multiple=True,
68
        help="Override backend parameter of the config file. "
69
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
70
    )(f)
71
72
73
def docs_limit_option(f):
74
    """Decorator to add an option for CLI commands to limit the number of documents to
75
    use"""
76
    return click.option(
77
        "--docs-limit",
78
        "-d",
79
        default=None,
80
        type=click.IntRange(0, None),
81
        help="Maximum number of documents to use",
82
    )(f)
83
84
85
def get_project(project_id: str) -> AnnifProject:
86
    """
87
    Helper function to get a project by ID and bail out if it doesn't exist"""
88
    try:
89
        return annif.registry.get_project(project_id, min_access=Access.private)
90
    except ValueError:
91
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
92
        sys.exit(1)
93
94
95
def get_vocab(vocab_id: str) -> AnnifVocabulary:
96
    """
97
    Helper function to get a vocabulary by ID and bail out if it doesn't
98
    exist"""
99
    try:
100
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
101
    except ValueError:
102
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
103
        sys.exit(1)
104
105
106
def make_list_template(*rows) -> str:
107
    """Helper function to create a template for a list of entries with fields of
108
    variable width. The width of each field is determined by the longest item in the
109
    field in the given rows."""
110
111
    max_field_widths = collections.defaultdict(int)
112
    for row in rows:
113
        for field_ind, item in enumerate(row):
114
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
115
116
    return "  ".join(
117
        [
118
            f"{{{field_ind}: <{field_width}}}"
119
            for field_ind, field_width in max_field_widths.items()
120
        ]
121
    )
122
123
124
def format_datetime(dt: datetime | None) -> str:
125
    """Helper function to format a datetime object as a string in the local time."""
126
    if dt is None:
127
        return "-"
128
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
129
130
131
def open_documents(
132
    paths: tuple[str, ...],
133
    subject_index: SubjectIndex,
134
    vocab_lang: str,
135
    docs_limit: int | None,
136
) -> DocumentCorpus:
137
    """Helper function to open a document corpus from a list of pathnames,
138
    each of which is either a TSV file or a directory of TXT files. For
139
    directories with subjects in TSV files, the given vocabulary language
140
    will be used to convert subject labels into URIs. The corpus will be
141
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
142
143
    def open_doc_path(path, subject_index):
144
        """open a single path and return it as a DocumentCorpus"""
145
        if os.path.isdir(path):
146
            return annif.corpus.DocumentDirectory(
147
                path, subject_index, vocab_lang, require_subjects=True
148
            )
149
        return annif.corpus.DocumentFile(path, subject_index)
150
151
    if len(paths) == 0:
152
        logger.warning("Reading empty file")
153
        docs = open_doc_path(os.path.devnull, subject_index)
154
    elif len(paths) == 1:
155
        docs = open_doc_path(paths[0], subject_index)
156
    else:
157
        corpora = [open_doc_path(path, subject_index) for path in paths]
158
        docs = annif.corpus.CombinedCorpus(corpora)
159
    if docs_limit is not None:
160
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
161
    return docs
162
163
164
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
165
    """
166
    Helper function to read text documents from the given file paths. Returns a
167
    DocumentList object with Documents having no subjects. If a path is "-", the
168
    document text is read from standard input. The maximum number of documents to read
169
    is set by docs_limit parameter.
170
    """
171
172
    def _docs(paths):
173
        for path in paths:
174
            if path == "-":
175
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
176
            else:
177
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
178
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
179
            yield doc
180
181
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
182
183
184
def show_hits(
185
    hits: SuggestionResult,
186
    project: AnnifProject,
187
    lang: str,
188
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 19 is False. Are you sure this can never be the case?
Loading history...
189
) -> None:
190
    """
191
    Print subject suggestions to the console or a file. The suggestions are displayed as
192
    a table, with one row per hit. Each row contains the URI, label, possible notation,
193
    and score of the suggestion. The label is given in the specified language.
194
    """
195
    template = "<{}>\t{}\t{:.04f}"
196
    for hit in hits:
197
        subj = project.subjects[hit.subject_id]
198
        line = template.format(
199
            subj.uri,
200
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
201
            hit.score,
202
        )
203
        click.echo(line, file=file)
204
205
206
def parse_backend_params(
207
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
208
) -> collections.defaultdict[str, dict[str, str]]:
209
    """Parse a list of backend parameters given with the --backend-param
210
    option into a nested dict structure"""
211
    backend_params = collections.defaultdict(dict)
212
    for beparam in backend_param:
213
        backend, param = beparam.split(".", 1)
214
        key, val = param.split("=", 1)
215
        _validate_backend_params(backend, beparam, project)
216
        backend_params[backend][key] = val
217
    return backend_params
218
219
220
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
221
    if backend != project.config["backend"]:
222
        raise ConfigurationException(
223
            'The backend {} in CLI option "-b {}" not matching the project'
224
            " backend {}.".format(backend, beparam, project.config["backend"])
225
        )
226
227
228
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
229
    limits = range(1, filter_batch_max_limit + 1)
230
    thresholds = [i * 0.05 for i in range(20)]
231
    return list(itertools.product(limits, thresholds))
232
233
234
def _get_completion_choices(
235
    param: Argument,
236
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
237
    if param.name in ("project_id", "project_ids_pattern"):
238
        return annif.registry.get_projects()
239
    elif param.name == "vocab_id":
240
        return annif.registry.get_vocabs()
241
    else:
242
        return []
243
244
245
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
246
    with ctx.obj.load_app().app_context():
247
        return [
248
            choice
249
            for choice in _get_completion_choices(param)
250
            if choice.startswith(incomplete)
251
        ]
252