Passed
Pull Request — main (#708)
by Juho
05:36 queued 02:48
created

annif.cli_util.open_documents()   B

Complexity

Conditions 5

Size

Total Lines 31
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 20
nop 4
dl 0
loc 31
rs 8.9332
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
from __future__ import annotations
3
4
import collections
5
import itertools
6
import os
7
import sys
8
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Tuple, Union
9
10
import click
11
import click_log
12
from flask import current_app
13
14
import annif
15
from annif.exception import ConfigurationException
16
from annif.project import Access
17
18
if TYPE_CHECKING:
19
    from datetime import datetime
20
    from io import TextIOWrapper
21
22
    from click.core import Argument, Context, Option
23
24
    from annif.corpus.document import DocumentCorpus, DocumentList
25
    from annif.corpus.subject import SubjectIndex
26
    from annif.project import AnnifProject
27
    from annif.suggestion import SuggestionResult
28
    from annif.vocab import AnnifVocabulary
29
30
logger = annif.logger
31
32
33
def _set_project_config_file_path(
34
    ctx: Context, param: Option, value: Optional[str]
35
) -> None:
36
    """Override the default path or the path given in env by CLI option"""
37
    with ctx.obj.load_app().app_context():
38
        if value:
39
            current_app.config["PROJECTS_CONFIG_PATH"] = value
40
41
42
def common_options(f):
43
    """Decorator to add common options for all CLI commands"""
44
    f = click.option(
45
        "-p",
46
        "--projects",
47
        help="Set path to project configuration file or directory",
48
        type=click.Path(dir_okay=True, exists=True),
49
        callback=_set_project_config_file_path,
50
        expose_value=False,
51
        is_eager=True,
52
    )(f)
53
    return click_log.simple_verbosity_option(logger)(f)
54
55
56
def project_id(f):
57
    """Decorator to add a project ID parameter to a CLI command"""
58
    return click.argument("project_id", shell_complete=complete_param)(f)
59
60
61
def backend_param_option(f):
62
    """Decorator to add an option for CLI commands to override BE parameters"""
63
    return click.option(
64
        "--backend-param",
65
        "-b",
66
        multiple=True,
67
        help="Override backend parameter of the config file. "
68
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
69
    )(f)
70
71
72
def docs_limit_option(f):
73
    """Decorator to add an option for CLI commands to limit the number of documents to
74
    use"""
75
    return click.option(
76
        "--docs-limit",
77
        "-d",
78
        default=None,
79
        type=click.IntRange(0, None),
80
        help="Maximum number of documents to use",
81
    )(f)
82
83
84
def get_project(project_id: str) -> AnnifProject:
85
    """
86
    Helper function to get a project by ID and bail out if it doesn't exist"""
87
    try:
88
        return annif.registry.get_project(project_id, min_access=Access.private)
89
    except ValueError:
90
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
91
        sys.exit(1)
92
93
94
def get_vocab(vocab_id: str) -> AnnifVocabulary:
95
    """
96
    Helper function to get a vocabulary by ID and bail out if it doesn't
97
    exist"""
98
    try:
99
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
100
    except ValueError:
101
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
102
        sys.exit(1)
103
104
105
def make_list_template(*rows) -> str:
106
    """Helper function to create a template for a list of entries with fields of
107
    variable width. The width of each field is determined by the longest item in the
108
    field in the given rows."""
109
110
    max_field_widths = collections.defaultdict(int)
111
    for row in rows:
112
        for field_ind, item in enumerate(row):
113
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
114
115
    return "  ".join(
116
        [
117
            f"{{{field_ind}: <{field_width}}}"
118
            for field_ind, field_width in max_field_widths.items()
119
        ]
120
    )
121
122
123
def format_datetime(dt: Optional[datetime]) -> str:
124
    """Helper function to format a datetime object as a string in the local time."""
125
    if dt is None:
126
        return "-"
127
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
128
129
130
def open_documents(
131
    paths: Union[Tuple[str, ...], Tuple[()]],
132
    subject_index: SubjectIndex,
133
    vocab_lang: str,
134
    docs_limit: Optional[int],
135
) -> DocumentCorpus:
136
    """Helper function to open a document corpus from a list of pathnames,
137
    each of which is either a TSV file or a directory of TXT files. For
138
    directories with subjects in TSV files, the given vocabulary language
139
    will be used to convert subject labels into URIs. The corpus will be
140
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
141
142
    def open_doc_path(path, subject_index):
143
        """open a single path and return it as a DocumentCorpus"""
144
        if os.path.isdir(path):
145
            return annif.corpus.DocumentDirectory(
146
                path, subject_index, vocab_lang, require_subjects=True
147
            )
148
        return annif.corpus.DocumentFile(path, subject_index)
149
150
    if len(paths) == 0:
151
        logger.warning("Reading empty file")
152
        docs = open_doc_path(os.path.devnull, subject_index)
153
    elif len(paths) == 1:
154
        docs = open_doc_path(paths[0], subject_index)
155
    else:
156
        corpora = [open_doc_path(path, subject_index) for path in paths]
157
        docs = annif.corpus.CombinedCorpus(corpora)
158
    if docs_limit is not None:
159
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
160
    return docs
161
162
163
def open_text_documents(
164
    paths: Tuple[str, ...], docs_limit: Optional[int]
165
) -> DocumentList:
166
    """
167
    Helper function to read text documents from the given file paths. Returns a
168
    DocumentList object with Documents having no subjects. If a path is "-", the
169
    document text is read from standard input. The maximum number of documents to read
170
    is set by docs_limit parameter.
171
    """
172
173
    def _docs(paths):
174
        for path in paths:
175
            if path == "-":
176
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
177
            else:
178
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
179
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
180
            yield doc
181
182
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
183
184
185
def show_hits(
186
    hits: SuggestionResult,
187
    project: AnnifProject,
188
    lang: str,
189
    file: Optional[TextIOWrapper] = None,
190
) -> None:
191
    """
192
    Print subject suggestions to the console or a file. The suggestions are displayed as
193
    a table, with one row per hit. Each row contains the URI, label, possible notation,
194
    and score of the suggestion. The label is given in the specified language.
195
    """
196
    template = "<{}>\t{}\t{:.04f}"
197
    for hit in hits:
198
        subj = project.subjects[hit.subject_id]
199
        line = template.format(
200
            subj.uri,
201
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
202
            hit.score,
203
        )
204
        click.echo(line, file=file)
205
206
207
def parse_backend_params(
208
    backend_param: Union[Tuple[str, ...], Tuple[()]], project: AnnifProject
209
) -> DefaultDict[str, Dict[str, str]]:
210
    """Parse a list of backend parameters given with the --backend-param
211
    option into a nested dict structure"""
212
    backend_params = collections.defaultdict(dict)
213
    for beparam in backend_param:
214
        backend, param = beparam.split(".", 1)
215
        key, val = param.split("=", 1)
216
        _validate_backend_params(backend, beparam, project)
217
        backend_params[backend][key] = val
218
    return backend_params
219
220
221
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
222
    if backend != project.config["backend"]:
223
        raise ConfigurationException(
224
            'The backend {} in CLI option "-b {}" not matching the project'
225
            " backend {}.".format(backend, beparam, project.config["backend"])
226
        )
227
228
229
def generate_filter_params(filter_batch_max_limit: int) -> List[Tuple[int, float]]:
230
    limits = range(1, filter_batch_max_limit + 1)
231
    thresholds = [i * 0.05 for i in range(20)]
232
    return list(itertools.product(limits, thresholds))
233
234
235
def _get_completion_choices(
236
    param: Argument,
237
) -> Dict[str, Union[AnnifVocabulary, AnnifProject]]:
238
    if param.name == "project_id":
239
        return annif.registry.get_projects()
240
    elif param.name == "vocab_id":
241
        return annif.registry.get_vocabs()
242
    else:
243
        return []
244
245
246
def complete_param(ctx: Context, param: Argument, incomplete: str) -> List[str]:
247
    with ctx.obj.load_app().app_context():
248
        return [
249
            choice
250
            for choice in _get_completion_choices(param)
251
            if choice.startswith(incomplete)
252
        ]
253