Passed
Push — upgrade-to-connexion3 ( e417e0...5d7ec9 )
by Juho
09:39 queued 05:10
created

annif.cli_util.make_list_template()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 1
dl 0
loc 14
rs 9.95
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
from __future__ import annotations
3
4
import collections
5
import itertools
6
import os
7
import sys
8
from typing import TYPE_CHECKING
9
10
import click
11
import click_log
12
from flask import current_app
13
14
import annif
15
from annif.exception import ConfigurationException
16
from annif.project import Access
17
18
if TYPE_CHECKING:
19
    from datetime import datetime
20
    from io import TextIOWrapper
21
22
    from click.core import Argument, Context, Option
23
24
    from annif.corpus.document import DocumentCorpus, DocumentList
25
    from annif.corpus.subject import SubjectIndex
26
    from annif.project import AnnifProject
27
    from annif.suggestion import SuggestionResult
28
    from annif.vocab import AnnifVocabulary
29
30
logger = annif.logger
31
32
33
def _set_project_config_file_path(
34
    ctx: Context, param: Option, value: str | None
35
) -> None:
36
    """Override the default path or the path given in env by CLI option"""
37
    with ctx.obj.load_app().app_context():
38
        if value:
39
            current_app.config["PROJECTS_CONFIG_PATH"] = value
40
41
42
def common_options(f):
43
    """Decorator to add common options for all CLI commands"""
44
    f = click.option(
45
        "-p",
46
        "--projects",
47
        help="Set path to project configuration file or directory",
48
        type=click.Path(dir_okay=True, exists=True),
49
        callback=_set_project_config_file_path,
50
        expose_value=False,
51
        is_eager=True,
52
    )(f)
53
    return click_log.simple_verbosity_option(logger)(f)
54
55
56
def project_id(f):
57
    """Decorator to add a project ID parameter to a CLI command"""
58
    return click.argument("project_id", shell_complete=complete_param)(f)
59
60
61
def backend_param_option(f):
62
    """Decorator to add an option for CLI commands to override BE parameters"""
63
    return click.option(
64
        "--backend-param",
65
        "-b",
66
        multiple=True,
67
        help="Override backend parameter of the config file. "
68
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
69
    )(f)
70
71
72
def docs_limit_option(f):
73
    """Decorator to add an option for CLI commands to limit the number of documents to
74
    use"""
75
    return click.option(
76
        "--docs-limit",
77
        "-d",
78
        default=None,
79
        type=click.IntRange(0, None),
80
        help="Maximum number of documents to use",
81
    )(f)
82
83
84
def get_project(project_id: str) -> AnnifProject:
85
    """
86
    Helper function to get a project by ID and bail out if it doesn't exist"""
87
    try:
88
        return annif.registry.get_project(project_id, min_access=Access.private)
89
    except ValueError:
90
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
91
        sys.exit(1)
92
93
94
def get_vocab(vocab_id: str) -> AnnifVocabulary:
95
    """
96
    Helper function to get a vocabulary by ID and bail out if it doesn't
97
    exist"""
98
    try:
99
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
100
    except ValueError:
101
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
102
        sys.exit(1)
103
104
105
def make_list_template(*rows) -> str:
106
    """Helper function to create a template for a list of entries with fields of
107
    variable width. The width of each field is determined by the longest item in the
108
    field in the given rows."""
109
110
    max_field_widths = collections.defaultdict(int)
111
    for row in rows:
112
        for field_ind, item in enumerate(row):
113
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
114
115
    return "  ".join(
116
        [
117
            f"{{{field_ind}: <{field_width}}}"
118
            for field_ind, field_width in max_field_widths.items()
119
        ]
120
    )
121
122
123
def format_datetime(dt: datetime | None) -> str:
124
    """Helper function to format a datetime object as a string in the local time."""
125
    if dt is None:
126
        return "-"
127
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
128
129
130
def open_documents(
131
    paths: tuple[str, ...],
132
    subject_index: SubjectIndex,
133
    vocab_lang: str,
134
    docs_limit: int | None,
135
) -> DocumentCorpus:
136
    """Helper function to open a document corpus from a list of pathnames,
137
    each of which is either a TSV file or a directory of TXT files. For
138
    directories with subjects in TSV files, the given vocabulary language
139
    will be used to convert subject labels into URIs. The corpus will be
140
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
141
142
    def open_doc_path(path, subject_index):
143
        """open a single path and return it as a DocumentCorpus"""
144
        if os.path.isdir(path):
145
            return annif.corpus.DocumentDirectory(
146
                path, subject_index, vocab_lang, require_subjects=True
147
            )
148
        return annif.corpus.DocumentFile(path, subject_index)
149
150
    if len(paths) == 0:
151
        logger.warning("Reading empty file")
152
        docs = open_doc_path(os.path.devnull, subject_index)
153
    elif len(paths) == 1:
154
        docs = open_doc_path(paths[0], subject_index)
155
    else:
156
        corpora = [open_doc_path(path, subject_index) for path in paths]
157
        docs = annif.corpus.CombinedCorpus(corpora)
158
    if docs_limit is not None:
159
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
160
    return docs
161
162
163
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
164
    """
165
    Helper function to read text documents from the given file paths. Returns a
166
    DocumentList object with Documents having no subjects. If a path is "-", the
167
    document text is read from standard input. The maximum number of documents to read
168
    is set by docs_limit parameter.
169
    """
170
171
    def _docs(paths):
172
        for path in paths:
173
            if path == "-":
174
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
175
            else:
176
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
177
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
178
            yield doc
179
180
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
181
182
183
def show_hits(
184
    hits: SuggestionResult,
185
    project: AnnifProject,
186
    lang: str,
187
    file: TextIOWrapper | None = None,
188
) -> None:
189
    """
190
    Print subject suggestions to the console or a file. The suggestions are displayed as
191
    a table, with one row per hit. Each row contains the URI, label, possible notation,
192
    and score of the suggestion. The label is given in the specified language.
193
    """
194
    template = "<{}>\t{}\t{:.04f}"
195
    for hit in hits:
196
        subj = project.subjects[hit.subject_id]
197
        line = template.format(
198
            subj.uri,
199
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
200
            hit.score,
201
        )
202
        click.echo(line, file=file)
203
204
205
def parse_backend_params(
206
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
207
) -> collections.defaultdict[str, dict[str, str]]:
208
    """Parse a list of backend parameters given with the --backend-param
209
    option into a nested dict structure"""
210
    backend_params = collections.defaultdict(dict)
211
    for beparam in backend_param:
212
        backend, param = beparam.split(".", 1)
213
        key, val = param.split("=", 1)
214
        _validate_backend_params(backend, beparam, project)
215
        backend_params[backend][key] = val
216
    return backend_params
217
218
219
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
220
    if backend != project.config["backend"]:
221
        raise ConfigurationException(
222
            'The backend {} in CLI option "-b {}" not matching the project'
223
            " backend {}.".format(backend, beparam, project.config["backend"])
224
        )
225
226
227
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
228
    limits = range(1, filter_batch_max_limit + 1)
229
    thresholds = [i * 0.05 for i in range(20)]
230
    return list(itertools.product(limits, thresholds))
231
232
233
def _get_completion_choices(
234
    param: Argument,
235
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
236
    if param.name == "project_id":
237
        return annif.registry.get_projects()
238
    elif param.name == "vocab_id":
239
        return annif.registry.get_vocabs()
240
    else:
241
        return []
242
243
244
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
245
    with ctx.obj.load_app().app_context():
246
        return [
247
            choice
248
            for choice in _get_completion_choices(param)
249
            if choice.startswith(incomplete)
250
        ]
251