Passed
Push — resolve-keras-userwarning ( 34af57 )
by Juho
03:31
created

annif.cli_util.get_output_stream()   C

Complexity

Conditions 9

Size

Total Lines 22
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 15
nop 5
dl 0
loc 22
rs 6.6666
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import gzip
7
import itertools
8
import os
9
import re
10
import sys
11
from contextlib import nullcontext
12
from typing import TYPE_CHECKING, Optional, TextIO
13
14
import click
15
import click_log
16
from flask import current_app
17
18
import annif
19
from annif.exception import ConfigurationException
20
from annif.project import Access
21
22
if TYPE_CHECKING:
23
    import io
24
    from datetime import datetime
25
26
    from click.core import Argument, Context, Option
27
28
    from annif.corpus.document import DocumentCorpus, DocumentList
29
    from annif.corpus.subject import SubjectIndex
30
    from annif.project import AnnifProject
31
    from annif.suggestion import SuggestionResult
32
    from annif.vocab import AnnifVocabulary
33
34
logger = annif.logger
35
36
37
def _set_project_config_file_path(
38
    ctx: Context, param: Option, value: str | None
39
) -> None:
40
    """Override the default path or the path given in env by CLI option"""
41
    with ctx.obj.load_app().app_context():
42
        if value:
43
            current_app.config["PROJECTS_CONFIG_PATH"] = value
44
45
46
def common_options(f):
47
    """Decorator to add common options for all CLI commands"""
48
    f = click.option(
49
        "-p",
50
        "--projects",
51
        help="Set path to project configuration file or directory",
52
        type=click.Path(dir_okay=True, exists=True),
53
        callback=_set_project_config_file_path,
54
        expose_value=False,
55
        is_eager=True,
56
    )(f)
57
    return click_log.simple_verbosity_option(logger)(f)
58
59
60
def project_id(f):
61
    """Decorator to add a project ID parameter to a CLI command"""
62
    return click.argument("project_id", shell_complete=complete_param)(f)
63
64
65
def backend_param_option(f):
66
    """Decorator to add an option for CLI commands to override BE parameters"""
67
    return click.option(
68
        "--backend-param",
69
        "-b",
70
        multiple=True,
71
        help="Override backend parameter of the config file. "
72
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
73
    )(f)
74
75
76
def docs_limit_option(f):
77
    """Decorator to add an option for CLI commands to limit the number of documents to
78
    use"""
79
    return click.option(
80
        "--docs-limit",
81
        "-d",
82
        default=None,
83
        type=click.IntRange(0, None),
84
        help="Maximum number of documents to use",
85
    )(f)
86
87
88
def get_project(project_id: str) -> AnnifProject:
89
    """
90
    Helper function to get a project by ID and bail out if it doesn't exist"""
91
    try:
92
        return annif.registry.get_project(project_id, min_access=Access.private)
93
    except ValueError:
94
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
95
        sys.exit(1)
96
97
98
def get_vocab(vocab_id: str) -> AnnifVocabulary:
99
    """
100
    Helper function to get a vocabulary by ID and bail out if it doesn't
101
    exist"""
102
    try:
103
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
104
    except ValueError:
105
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
106
        sys.exit(1)
107
108
109
def make_list_template(*rows) -> str:
110
    """Helper function to create a template for a list of entries with fields of
111
    variable width. The width of each field is determined by the longest item in the
112
    field in the given rows."""
113
114
    max_field_widths = collections.defaultdict(int)
115
    for row in rows:
116
        for field_ind, item in enumerate(row):
117
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
118
119
    return "  ".join(
120
        [
121
            f"{{{field_ind}: <{field_width}}}"
122
            for field_ind, field_width in max_field_widths.items()
123
        ]
124
    )
125
126
127
def format_datetime(dt: datetime | None) -> str:
128
    """Helper function to format a datetime object as a string in the local time."""
129
    if dt is None:
130
        return "-"
131
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
132
133
134
def open_doc_path(path, subject_index, vocab_lang, require_subjects=True):
135
    """open a single path and return it as a DocumentCorpus"""
136
    if os.path.isdir(path):
137
        return annif.corpus.DocumentDirectory(
138
            path, subject_index, vocab_lang, require_subjects
139
        )
140
    if annif.corpus.DocumentFileCSV.is_csv_file(path):
141
        return annif.corpus.DocumentFileCSV(path, subject_index, require_subjects)
142
    elif annif.corpus.DocumentFileJSONL.is_jsonl_file(path):
143
        return annif.corpus.DocumentFileJSONL(
144
            path, subject_index, vocab_lang, require_subjects
145
        )
146
    else:
147
        return annif.corpus.DocumentFileTSV(path, subject_index, require_subjects)
148
149
150
def open_documents(
151
    paths: tuple[str, ...],
152
    subject_index: SubjectIndex,
153
    vocab_lang: str,
154
    docs_limit: int | None,
155
) -> DocumentCorpus:
156
    """Helper function to open a document corpus from a list of pathnames,
157
    each of which is either a CSV, TSV or JSONL file or a directory of TXT
158
    or JSON files. For corpora with subjects expressed as labels, the given
159
    vocabulary language will be used to convert subject labels into URIs.
160
    The corpus will be returned as an instance of DocumentCorpus or
161
    LimitingDocumentCorpus."""
162
163
    if len(paths) == 0:
164
        logger.warning("Reading empty file")
165
        docs = open_doc_path(os.path.devnull, subject_index, vocab_lang)
166
    elif len(paths) == 1:
167
        docs = open_doc_path(paths[0], subject_index, vocab_lang)
168
    else:
169
        corpora = [open_doc_path(path, subject_index, vocab_lang) for path in paths]
170
        docs = annif.corpus.CombinedCorpus(corpora)
171
    if docs_limit is not None:
172
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
173
    return docs
174
175
176
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
177
    """
178
    Helper function to read text documents from the given file paths. Returns a
179
    DocumentList object with Documents having no subjects. If a path is "-", the
180
    document text is read from standard input. The maximum number of documents to read
181
    is set by docs_limit parameter.
182
    """
183
184
    def _docs(paths):
185
        for path in paths:
186
            if path == "-":
187
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
188
            else:
189
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
190
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
191
            yield doc
192
193
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
194
195
196
def get_output_stream(
197
    path: str, suffix: str, output: Optional[str], use_gzip: bool, force: bool
198
) -> Optional[TextIO]:
199
    """Return a writable output stream based on the output option."""
200
201
    if output == "-":
202
        return nullcontext(sys.stdout)
203
    elif output:
204
        outfilename = output + (
205
            ".gz" if use_gzip and not output.endswith(".gz") else ""
206
        )
207
    else:
208
        outfilename = re.sub(r"(\.[^.]+)?(\.gz)?$", "", path) + suffix
209
        if use_gzip and not outfilename.endswith(".gz"):
210
            outfilename += ".gz"
211
212
    if not force and os.path.exists(outfilename):
213
        click.echo(f"Not overwriting {outfilename} (use --force to override)")
214
        return None
215
216
    opener = gzip.open if use_gzip else open
217
    return opener(outfilename, "wt", encoding="utf-8")
218
219
220
def show_hits(
221
    hits: SuggestionResult,
222
    project: AnnifProject,
223
    lang: str,
224
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 22 is False. Are you sure this can never be the case?
Loading history...
225
) -> None:
226
    """
227
    Print subject suggestions to the console or a file. The suggestions are displayed as
228
    a table, with one row per hit. Each row contains the URI, label, possible notation,
229
    and score of the suggestion. The label is given in the specified language.
230
    """
231
    template = "<{}>\t{}\t{:.04f}"
232
    for hit in hits:
233
        subj = project.subjects[hit.subject_id]
234
        line = template.format(
235
            subj.uri,
236
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
237
            hit.score,
238
        )
239
        click.echo(line, file=file)
240
241
242
def parse_backend_params(
243
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
244
) -> collections.defaultdict[str, dict[str, str]]:
245
    """Parse a list of backend parameters given with the --backend-param
246
    option into a nested dict structure"""
247
    backend_params = collections.defaultdict(dict)
248
    for beparam in backend_param:
249
        backend, param = beparam.split(".", 1)
250
        key, val = param.split("=", 1)
251
        _validate_backend_params(backend, beparam, project)
252
        backend_params[backend][key] = val
253
    return backend_params
254
255
256
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
257
    if backend != project.config["backend"]:
258
        raise ConfigurationException(
259
            'The backend {} in CLI option "-b {}" not matching the project'
260
            " backend {}.".format(backend, beparam, project.config["backend"])
261
        )
262
263
264
def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]:
265
    """Parse a list of metadata parameters given with the --metadata
266
    option into a dictionary"""
267
268
    metadata_dict = {}
269
    for item in metadata:
270
        if "=" not in item:
271
            raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.")
272
        key, value = item.split("=", 1)
273
        metadata_dict[key] = value
274
275
    return metadata_dict
276
277
278
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
279
    limits = range(1, filter_batch_max_limit + 1)
280
    thresholds = [i * 0.05 for i in range(20)]
281
    return list(itertools.product(limits, thresholds))
282
283
284
def _get_completion_choices(
285
    param: Argument,
286
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
287
    if param.name in ("project_id", "project_ids_pattern"):
288
        return annif.registry.get_projects()
289
    elif param.name == "vocab_id":
290
        return annif.registry.get_vocabs()
291
    else:
292
        return []
293
294
295
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
296
    with ctx.obj.load_app().app_context():
297
        return [
298
            choice
299
            for choice in _get_completion_choices(param)
300
            if choice.startswith(incomplete)
301
        ]
302