annif.cli_util.docs_limit_option()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import itertools
7
import os
8
import sys
9
from typing import TYPE_CHECKING
10
11
import click
12
import click_log
13
from flask import current_app
14
15
import annif
16
from annif.exception import ConfigurationException
17
from annif.project import Access
18
19
if TYPE_CHECKING:
20
    import io
21
    from datetime import datetime
22
23
    from click.core import Argument, Context, Option
24
25
    from annif.corpus.document import DocumentCorpus, DocumentList
26
    from annif.corpus.subject import SubjectIndex
27
    from annif.project import AnnifProject
28
    from annif.suggestion import SuggestionResult
29
    from annif.vocab import AnnifVocabulary
30
31
logger = annif.logger
32
33
34
def _set_project_config_file_path(
35
    ctx: Context, param: Option, value: str | None
36
) -> None:
37
    """Override the default path or the path given in env by CLI option"""
38
    with ctx.obj.load_app().app_context():
39
        if value:
40
            current_app.config["PROJECTS_CONFIG_PATH"] = value
41
42
43
def common_options(f):
44
    """Decorator to add common options for all CLI commands"""
45
    f = click.option(
46
        "-p",
47
        "--projects",
48
        help="Set path to project configuration file or directory",
49
        type=click.Path(dir_okay=True, exists=True),
50
        callback=_set_project_config_file_path,
51
        expose_value=False,
52
        is_eager=True,
53
    )(f)
54
    return click_log.simple_verbosity_option(logger)(f)
55
56
57
def project_id(f):
58
    """Decorator to add a project ID parameter to a CLI command"""
59
    return click.argument("project_id", shell_complete=complete_param)(f)
60
61
62
def backend_param_option(f):
63
    """Decorator to add an option for CLI commands to override BE parameters"""
64
    return click.option(
65
        "--backend-param",
66
        "-b",
67
        multiple=True,
68
        help="Override backend parameter of the config file. "
69
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
70
    )(f)
71
72
73
def docs_limit_option(f):
74
    """Decorator to add an option for CLI commands to limit the number of documents to
75
    use"""
76
    return click.option(
77
        "--docs-limit",
78
        "-d",
79
        default=None,
80
        type=click.IntRange(0, None),
81
        help="Maximum number of documents to use",
82
    )(f)
83
84
85
def get_project(project_id: str) -> AnnifProject:
86
    """
87
    Helper function to get a project by ID and bail out if it doesn't exist"""
88
    try:
89
        return annif.registry.get_project(project_id, min_access=Access.private)
90
    except ValueError:
91
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
92
        sys.exit(1)
93
94
95
def get_vocab(vocab_id: str) -> AnnifVocabulary:
96
    """
97
    Helper function to get a vocabulary by ID and bail out if it doesn't
98
    exist"""
99
    try:
100
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
101
    except ValueError:
102
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
103
        sys.exit(1)
104
105
106
def make_list_template(*rows) -> str:
107
    """Helper function to create a template for a list of entries with fields of
108
    variable width. The width of each field is determined by the longest item in the
109
    field in the given rows."""
110
111
    max_field_widths = collections.defaultdict(int)
112
    for row in rows:
113
        for field_ind, item in enumerate(row):
114
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
115
116
    return "  ".join(
117
        [
118
            f"{{{field_ind}: <{field_width}}}"
119
            for field_ind, field_width in max_field_widths.items()
120
        ]
121
    )
122
123
124
def format_datetime(dt: datetime | None) -> str:
125
    """Helper function to format a datetime object as a string in the local time."""
126
    if dt is None:
127
        return "-"
128
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
129
130
131
def open_documents(
132
    paths: tuple[str, ...],
133
    subject_index: SubjectIndex,
134
    vocab_lang: str,
135
    docs_limit: int | None,
136
) -> DocumentCorpus:
137
    """Helper function to open a document corpus from a list of pathnames,
138
    each of which is either a TSV or CSV file or a directory of TXT files. For
139
    directories with subjects in TSV files, the given vocabulary language
140
    will be used to convert subject labels into URIs. The corpus will be
141
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
142
143
    def open_doc_path(path, subject_index):
144
        """open a single path and return it as a DocumentCorpus"""
145
        if os.path.isdir(path):
146
            return annif.corpus.DocumentDirectory(
147
                path, subject_index, vocab_lang, require_subjects=True
148
            )
149
        if annif.corpus.DocumentFileCSV.is_csv_file(path):
150
            return annif.corpus.DocumentFileCSV(path, subject_index)
151
        else:
152
            return annif.corpus.DocumentFileTSV(path, subject_index)
153
154
    if len(paths) == 0:
155
        logger.warning("Reading empty file")
156
        docs = open_doc_path(os.path.devnull, subject_index)
157
    elif len(paths) == 1:
158
        docs = open_doc_path(paths[0], subject_index)
159
    else:
160
        corpora = [open_doc_path(path, subject_index) for path in paths]
161
        docs = annif.corpus.CombinedCorpus(corpora)
162
    if docs_limit is not None:
163
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
164
    return docs
165
166
167
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
168
    """
169
    Helper function to read text documents from the given file paths. Returns a
170
    DocumentList object with Documents having no subjects. If a path is "-", the
171
    document text is read from standard input. The maximum number of documents to read
172
    is set by docs_limit parameter.
173
    """
174
175
    def _docs(paths):
176
        for path in paths:
177
            if path == "-":
178
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
179
            else:
180
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
181
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
182
            yield doc
183
184
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
185
186
187
def show_hits(
188
    hits: SuggestionResult,
189
    project: AnnifProject,
190
    lang: str,
191
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 19 is False. Are you sure this can never be the case?
Loading history...
192
) -> None:
193
    """
194
    Print subject suggestions to the console or a file. The suggestions are displayed as
195
    a table, with one row per hit. Each row contains the URI, label, possible notation,
196
    and score of the suggestion. The label is given in the specified language.
197
    """
198
    template = "<{}>\t{}\t{:.04f}"
199
    for hit in hits:
200
        subj = project.subjects[hit.subject_id]
201
        line = template.format(
202
            subj.uri,
203
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
204
            hit.score,
205
        )
206
        click.echo(line, file=file)
207
208
209
def parse_backend_params(
210
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
211
) -> collections.defaultdict[str, dict[str, str]]:
212
    """Parse a list of backend parameters given with the --backend-param
213
    option into a nested dict structure"""
214
    backend_params = collections.defaultdict(dict)
215
    for beparam in backend_param:
216
        backend, param = beparam.split(".", 1)
217
        key, val = param.split("=", 1)
218
        _validate_backend_params(backend, beparam, project)
219
        backend_params[backend][key] = val
220
    return backend_params
221
222
223
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
224
    if backend != project.config["backend"]:
225
        raise ConfigurationException(
226
            'The backend {} in CLI option "-b {}" not matching the project'
227
            " backend {}.".format(backend, beparam, project.config["backend"])
228
        )
229
230
231
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
232
    limits = range(1, filter_batch_max_limit + 1)
233
    thresholds = [i * 0.05 for i in range(20)]
234
    return list(itertools.product(limits, thresholds))
235
236
237
def _get_completion_choices(
238
    param: Argument,
239
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
240
    if param.name in ("project_id", "project_ids_pattern"):
241
        return annif.registry.get_projects()
242
    elif param.name == "vocab_id":
243
        return annif.registry.get_vocabs()
244
    else:
245
        return []
246
247
248
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
249
    with ctx.obj.load_app().app_context():
250
        return [
251
            choice
252
            for choice in _get_completion_choices(param)
253
            if choice.startswith(incomplete)
254
        ]
255