annif.cli_util   A
last analyzed

Complexity

Total Complexity 42

Size/Duplication

Total Lines 271
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 168
dl 0
loc 271
rs 9.0399
c 0
b 0
f 0
wmc 42

18 Functions

Rating   Name   Duplication   Size   Complexity  
A _validate_backend_params() 0 5 2
A parse_backend_params() 0 12 2
A open_text_documents() 0 18 4
A show_hits() 0 20 2
A format_datetime() 0 5 2
A get_vocab() 0 9 2
A _set_project_config_file_path() 0 7 3
A make_list_template() 0 14 3
A backend_param_option() 0 9 1
A docs_limit_option() 0 10 1
A common_options() 0 12 1
A get_project() 0 8 2
A project_id() 0 3 1
A generate_filter_params() 0 4 1
B open_documents() 0 37 7
A _get_completion_choices() 0 9 3
A parse_metadata() 0 12 3
A complete_param() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like annif.cli_util often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import itertools
7
import os
8
import sys
9
from typing import TYPE_CHECKING
10
11
import click
12
import click_log
13
from flask import current_app
14
15
import annif
16
from annif.exception import ConfigurationException
17
from annif.project import Access
18
19
if TYPE_CHECKING:
20
    import io
21
    from datetime import datetime
22
23
    from click.core import Argument, Context, Option
24
25
    from annif.corpus.document import DocumentCorpus, DocumentList
26
    from annif.corpus.subject import SubjectIndex
27
    from annif.project import AnnifProject
28
    from annif.suggestion import SuggestionResult
29
    from annif.vocab import AnnifVocabulary
30
31
logger = annif.logger
32
33
34
def _set_project_config_file_path(
35
    ctx: Context, param: Option, value: str | None
36
) -> None:
37
    """Override the default path or the path given in env by CLI option"""
38
    with ctx.obj.load_app().app_context():
39
        if value:
40
            current_app.config["PROJECTS_CONFIG_PATH"] = value
41
42
43
def common_options(f):
44
    """Decorator to add common options for all CLI commands"""
45
    f = click.option(
46
        "-p",
47
        "--projects",
48
        help="Set path to project configuration file or directory",
49
        type=click.Path(dir_okay=True, exists=True),
50
        callback=_set_project_config_file_path,
51
        expose_value=False,
52
        is_eager=True,
53
    )(f)
54
    return click_log.simple_verbosity_option(logger)(f)
55
56
57
def project_id(f):
58
    """Decorator to add a project ID parameter to a CLI command"""
59
    return click.argument("project_id", shell_complete=complete_param)(f)
60
61
62
def backend_param_option(f):
63
    """Decorator to add an option for CLI commands to override BE parameters"""
64
    return click.option(
65
        "--backend-param",
66
        "-b",
67
        multiple=True,
68
        help="Override backend parameter of the config file. "
69
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
70
    )(f)
71
72
73
def docs_limit_option(f):
74
    """Decorator to add an option for CLI commands to limit the number of documents to
75
    use"""
76
    return click.option(
77
        "--docs-limit",
78
        "-d",
79
        default=None,
80
        type=click.IntRange(0, None),
81
        help="Maximum number of documents to use",
82
    )(f)
83
84
85
def get_project(project_id: str) -> AnnifProject:
86
    """
87
    Helper function to get a project by ID and bail out if it doesn't exist"""
88
    try:
89
        return annif.registry.get_project(project_id, min_access=Access.private)
90
    except ValueError:
91
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
92
        sys.exit(1)
93
94
95
def get_vocab(vocab_id: str) -> AnnifVocabulary:
96
    """
97
    Helper function to get a vocabulary by ID and bail out if it doesn't
98
    exist"""
99
    try:
100
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
101
    except ValueError:
102
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
103
        sys.exit(1)
104
105
106
def make_list_template(*rows) -> str:
107
    """Helper function to create a template for a list of entries with fields of
108
    variable width. The width of each field is determined by the longest item in the
109
    field in the given rows."""
110
111
    max_field_widths = collections.defaultdict(int)
112
    for row in rows:
113
        for field_ind, item in enumerate(row):
114
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
115
116
    return "  ".join(
117
        [
118
            f"{{{field_ind}: <{field_width}}}"
119
            for field_ind, field_width in max_field_widths.items()
120
        ]
121
    )
122
123
124
def format_datetime(dt: datetime | None) -> str:
125
    """Helper function to format a datetime object as a string in the local time."""
126
    if dt is None:
127
        return "-"
128
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
129
130
131
def open_documents(
132
    paths: tuple[str, ...],
133
    subject_index: SubjectIndex,
134
    vocab_lang: str,
135
    docs_limit: int | None,
136
) -> DocumentCorpus:
137
    """Helper function to open a document corpus from a list of pathnames,
138
    each of which is either a CSV, TSV or JSONL file or a directory of TXT
139
    or JSON files. For corpora with subjects expressed as labels, the given
140
    vocabulary language will be used to convert subject labels into URIs.
141
    The corpus will be returned as an instance of DocumentCorpus or
142
    LimitingDocumentCorpus."""
143
144
    def open_doc_path(path, subject_index):
145
        """open a single path and return it as a DocumentCorpus"""
146
        if os.path.isdir(path):
147
            return annif.corpus.DocumentDirectory(
148
                path, subject_index, vocab_lang, require_subjects=True
149
            )
150
        if annif.corpus.DocumentFileCSV.is_csv_file(path):
151
            return annif.corpus.DocumentFileCSV(path, subject_index)
152
        elif annif.corpus.DocumentFileJSONL.is_jsonl_file(path):
153
            return annif.corpus.DocumentFileJSONL(path, subject_index, vocab_lang)
154
        else:
155
            return annif.corpus.DocumentFileTSV(path, subject_index)
156
157
    if len(paths) == 0:
158
        logger.warning("Reading empty file")
159
        docs = open_doc_path(os.path.devnull, subject_index)
160
    elif len(paths) == 1:
161
        docs = open_doc_path(paths[0], subject_index)
162
    else:
163
        corpora = [open_doc_path(path, subject_index) for path in paths]
164
        docs = annif.corpus.CombinedCorpus(corpora)
165
    if docs_limit is not None:
166
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
167
    return docs
168
169
170
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
171
    """
172
    Helper function to read text documents from the given file paths. Returns a
173
    DocumentList object with Documents having no subjects. If a path is "-", the
174
    document text is read from standard input. The maximum number of documents to read
175
    is set by docs_limit parameter.
176
    """
177
178
    def _docs(paths):
179
        for path in paths:
180
            if path == "-":
181
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
182
            else:
183
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
184
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
185
            yield doc
186
187
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
188
189
190
def show_hits(
191
    hits: SuggestionResult,
192
    project: AnnifProject,
193
    lang: str,
194
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 19 is False. Are you sure this can never be the case?
Loading history...
195
) -> None:
196
    """
197
    Print subject suggestions to the console or a file. The suggestions are displayed as
198
    a table, with one row per hit. Each row contains the URI, label, possible notation,
199
    and score of the suggestion. The label is given in the specified language.
200
    """
201
    template = "<{}>\t{}\t{:.04f}"
202
    for hit in hits:
203
        subj = project.subjects[hit.subject_id]
204
        line = template.format(
205
            subj.uri,
206
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
207
            hit.score,
208
        )
209
        click.echo(line, file=file)
210
211
212
def parse_backend_params(
213
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
214
) -> collections.defaultdict[str, dict[str, str]]:
215
    """Parse a list of backend parameters given with the --backend-param
216
    option into a nested dict structure"""
217
    backend_params = collections.defaultdict(dict)
218
    for beparam in backend_param:
219
        backend, param = beparam.split(".", 1)
220
        key, val = param.split("=", 1)
221
        _validate_backend_params(backend, beparam, project)
222
        backend_params[backend][key] = val
223
    return backend_params
224
225
226
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
227
    if backend != project.config["backend"]:
228
        raise ConfigurationException(
229
            'The backend {} in CLI option "-b {}" not matching the project'
230
            " backend {}.".format(backend, beparam, project.config["backend"])
231
        )
232
233
234
def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]:
235
    """Parse a list of metadata parameters given with the --metadata
236
    option into a dictionary"""
237
238
    metadata_dict = {}
239
    for item in metadata:
240
        if "=" not in item:
241
            raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.")
242
        key, value = item.split("=", 1)
243
        metadata_dict[key] = value
244
245
    return metadata_dict
246
247
248
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
249
    limits = range(1, filter_batch_max_limit + 1)
250
    thresholds = [i * 0.05 for i in range(20)]
251
    return list(itertools.product(limits, thresholds))
252
253
254
def _get_completion_choices(
255
    param: Argument,
256
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
257
    if param.name in ("project_id", "project_ids_pattern"):
258
        return annif.registry.get_projects()
259
    elif param.name == "vocab_id":
260
        return annif.registry.get_vocabs()
261
    else:
262
        return []
263
264
265
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
266
    with ctx.obj.load_app().app_context():
267
        return [
268
            choice
269
            for choice in _get_completion_choices(param)
270
            if choice.startswith(incomplete)
271
        ]
272