annif.cli_util.project_id()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import itertools
7
import os
8
import sys
9
from typing import TYPE_CHECKING
10
11
import click
12
import click_log
13
from flask import current_app
14
15
import annif
16
from annif.exception import ConfigurationException
17
from annif.project import Access
18
19
if TYPE_CHECKING:
20
    import io
21
    from datetime import datetime
22
23
    from click.core import Argument, Context, Option
24
25
    from annif.corpus.document import DocumentCorpus, DocumentList
26
    from annif.corpus.subject import SubjectIndex
27
    from annif.project import AnnifProject
28
    from annif.suggestion import SuggestionResult
29
    from annif.vocab import AnnifVocabulary
30
31
logger = annif.logger
32
33
34
def _set_project_config_file_path(
35
    ctx: Context, param: Option, value: str | None
36
) -> None:
37
    """Override the default path or the path given in env by CLI option"""
38
    with ctx.obj.load_app().app_context():
39
        if value:
40
            current_app.config["PROJECTS_CONFIG_PATH"] = value
41
42
43
def common_options(f):
44
    """Decorator to add common options for all CLI commands"""
45
    f = click.option(
46
        "-p",
47
        "--projects",
48
        help="Set path to project configuration file or directory",
49
        type=click.Path(dir_okay=True, exists=True),
50
        callback=_set_project_config_file_path,
51
        expose_value=False,
52
        is_eager=True,
53
    )(f)
54
    return click_log.simple_verbosity_option(logger)(f)
55
56
57
def project_id(f):
58
    """Decorator to add a project ID parameter to a CLI command"""
59
    return click.argument("project_id", shell_complete=complete_param)(f)
60
61
62
def backend_param_option(f):
63
    """Decorator to add an option for CLI commands to override BE parameters"""
64
    return click.option(
65
        "--backend-param",
66
        "-b",
67
        multiple=True,
68
        help="Override backend parameter of the config file. "
69
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
70
    )(f)
71
72
73
def docs_limit_option(f):
74
    """Decorator to add an option for CLI commands to limit the number of documents to
75
    use"""
76
    return click.option(
77
        "--docs-limit",
78
        "-d",
79
        default=None,
80
        type=click.IntRange(0, None),
81
        help="Maximum number of documents to use",
82
    )(f)
83
84
85
def get_project(project_id: str) -> AnnifProject:
86
    """
87
    Helper function to get a project by ID and bail out if it doesn't exist"""
88
    try:
89
        return annif.registry.get_project(project_id, min_access=Access.private)
90
    except ValueError:
91
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
92
        sys.exit(1)
93
94
95
def get_vocab(vocab_id: str) -> AnnifVocabulary:
96
    """
97
    Helper function to get a vocabulary by ID and bail out if it doesn't
98
    exist"""
99
    try:
100
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
101
    except ValueError:
102
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
103
        sys.exit(1)
104
105
106
def make_list_template(*rows) -> str:
107
    """Helper function to create a template for a list of entries with fields of
108
    variable width. The width of each field is determined by the longest item in the
109
    field in the given rows."""
110
111
    max_field_widths = collections.defaultdict(int)
112
    for row in rows:
113
        for field_ind, item in enumerate(row):
114
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
115
116
    return "  ".join(
117
        [
118
            f"{{{field_ind}: <{field_width}}}"
119
            for field_ind, field_width in max_field_widths.items()
120
        ]
121
    )
122
123
124
def format_datetime(dt: datetime | None) -> str:
125
    """Helper function to format a datetime object as a string in the local time."""
126
    if dt is None:
127
        return "-"
128
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
129
130
131
def open_doc_path(path, subject_index, vocab_lang, require_subjects=True):
132
    """open a single path and return it as a DocumentCorpus"""
133
    if os.path.isdir(path):
134
        return annif.corpus.DocumentDirectory(
135
            path, subject_index, vocab_lang, require_subjects
136
        )
137
    if annif.corpus.DocumentFileCSV.is_csv_file(path):
138
        return annif.corpus.DocumentFileCSV(path, subject_index, require_subjects)
139
    elif annif.corpus.DocumentFileJSONL.is_jsonl_file(path):
140
        return annif.corpus.DocumentFileJSONL(
141
            path, subject_index, vocab_lang, require_subjects
142
        )
143
    else:
144
        return annif.corpus.DocumentFileTSV(path, subject_index, require_subjects)
145
146
147
def open_documents(
148
    paths: tuple[str, ...],
149
    subject_index: SubjectIndex,
150
    vocab_lang: str,
151
    docs_limit: int | None,
152
) -> DocumentCorpus:
153
    """Helper function to open a document corpus from a list of pathnames,
154
    each of which is either a CSV, TSV or JSONL file or a directory of TXT
155
    or JSON files. For corpora with subjects expressed as labels, the given
156
    vocabulary language will be used to convert subject labels into URIs.
157
    The corpus will be returned as an instance of DocumentCorpus or
158
    LimitingDocumentCorpus."""
159
160
    if len(paths) == 0:
161
        logger.warning("Reading empty file")
162
        docs = open_doc_path(os.path.devnull, subject_index, vocab_lang)
163
    elif len(paths) == 1:
164
        docs = open_doc_path(paths[0], subject_index, vocab_lang)
165
    else:
166
        corpora = [open_doc_path(path, subject_index, vocab_lang) for path in paths]
167
        docs = annif.corpus.CombinedCorpus(corpora)
168
    if docs_limit is not None:
169
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
170
    return docs
171
172
173
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
174
    """
175
    Helper function to read text documents from the given file paths. Returns a
176
    DocumentList object with Documents having no subjects. If a path is "-", the
177
    document text is read from standard input. The maximum number of documents to read
178
    is set by docs_limit parameter.
179
    """
180
181
    def _docs(paths):
182
        for path in paths:
183
            if path == "-":
184
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
185
            else:
186
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
187
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
188
            yield doc
189
190
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
191
192
193
def show_hits(
194
    hits: SuggestionResult,
195
    project: AnnifProject,
196
    lang: str,
197
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 19 is False. Are you sure this can never be the case?
Loading history...
198
) -> None:
199
    """
200
    Print subject suggestions to the console or a file. The suggestions are displayed as
201
    a table, with one row per hit. Each row contains the URI, label, possible notation,
202
    and score of the suggestion. The label is given in the specified language.
203
    """
204
    template = "<{}>\t{}\t{:.04f}"
205
    for hit in hits:
206
        subj = project.subjects[hit.subject_id]
207
        line = template.format(
208
            subj.uri,
209
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
210
            hit.score,
211
        )
212
        click.echo(line, file=file)
213
214
215
def parse_backend_params(
216
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
217
) -> collections.defaultdict[str, dict[str, str]]:
218
    """Parse a list of backend parameters given with the --backend-param
219
    option into a nested dict structure"""
220
    backend_params = collections.defaultdict(dict)
221
    for beparam in backend_param:
222
        backend, param = beparam.split(".", 1)
223
        key, val = param.split("=", 1)
224
        _validate_backend_params(backend, beparam, project)
225
        backend_params[backend][key] = val
226
    return backend_params
227
228
229
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
230
    if backend != project.config["backend"]:
231
        raise ConfigurationException(
232
            'The backend {} in CLI option "-b {}" not matching the project'
233
            " backend {}.".format(backend, beparam, project.config["backend"])
234
        )
235
236
237
def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]:
238
    """Parse a list of metadata parameters given with the --metadata
239
    option into a dictionary"""
240
241
    metadata_dict = {}
242
    for item in metadata:
243
        if "=" not in item:
244
            raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.")
245
        key, value = item.split("=", 1)
246
        metadata_dict[key] = value
247
248
    return metadata_dict
249
250
251
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
252
    limits = range(1, filter_batch_max_limit + 1)
253
    thresholds = [i * 0.05 for i in range(20)]
254
    return list(itertools.product(limits, thresholds))
255
256
257
def _get_completion_choices(
258
    param: Argument,
259
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
260
    if param.name in ("project_id", "project_ids_pattern"):
261
        return annif.registry.get_projects()
262
    elif param.name == "vocab_id":
263
        return annif.registry.get_vocabs()
264
    else:
265
        return []
266
267
268
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
269
    with ctx.obj.load_app().app_context():
270
        return [
271
            choice
272
            for choice in _get_completion_choices(param)
273
            if choice.startswith(incomplete)
274
        ]
275