annif.cli_util   A
last analyzed

Complexity

Total Complexity 41

Size/Duplication

Total Lines 268
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 166
dl 0
loc 268
rs 9.1199
c 0
b 0
f 0
wmc 41

18 Functions

Rating   Name   Duplication   Size   Complexity  
A format_datetime() 0 5 2
A get_vocab() 0 9 2
A _set_project_config_file_path() 0 7 3
A make_list_template() 0 14 3
A backend_param_option() 0 9 1
A docs_limit_option() 0 10 1
A common_options() 0 12 1
A get_project() 0 8 2
A project_id() 0 3 1
A _validate_backend_params() 0 5 2
B open_documents() 0 34 6
A parse_backend_params() 0 12 2
A open_text_documents() 0 18 4
A show_hits() 0 20 2
A generate_filter_params() 0 4 1
A _get_completion_choices() 0 9 3
A parse_metadata() 0 12 3
A complete_param() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like annif.cli_util often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions for Annif CLI commands"""
2
3
from __future__ import annotations
4
5
import collections
6
import itertools
7
import os
8
import sys
9
from typing import TYPE_CHECKING
10
11
import click
12
import click_log
13
from flask import current_app
14
15
import annif
16
from annif.exception import ConfigurationException
17
from annif.project import Access
18
19
if TYPE_CHECKING:
20
    import io
21
    from datetime import datetime
22
23
    from click.core import Argument, Context, Option
24
25
    from annif.corpus.document import DocumentCorpus, DocumentList
26
    from annif.corpus.subject import SubjectIndex
27
    from annif.project import AnnifProject
28
    from annif.suggestion import SuggestionResult
29
    from annif.vocab import AnnifVocabulary
30
31
logger = annif.logger
32
33
34
def _set_project_config_file_path(
35
    ctx: Context, param: Option, value: str | None
36
) -> None:
37
    """Override the default path or the path given in env by CLI option"""
38
    with ctx.obj.load_app().app_context():
39
        if value:
40
            current_app.config["PROJECTS_CONFIG_PATH"] = value
41
42
43
def common_options(f):
44
    """Decorator to add common options for all CLI commands"""
45
    f = click.option(
46
        "-p",
47
        "--projects",
48
        help="Set path to project configuration file or directory",
49
        type=click.Path(dir_okay=True, exists=True),
50
        callback=_set_project_config_file_path,
51
        expose_value=False,
52
        is_eager=True,
53
    )(f)
54
    return click_log.simple_verbosity_option(logger)(f)
55
56
57
def project_id(f):
58
    """Decorator to add a project ID parameter to a CLI command"""
59
    return click.argument("project_id", shell_complete=complete_param)(f)
60
61
62
def backend_param_option(f):
63
    """Decorator to add an option for CLI commands to override BE parameters"""
64
    return click.option(
65
        "--backend-param",
66
        "-b",
67
        multiple=True,
68
        help="Override backend parameter of the config file. "
69
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
70
    )(f)
71
72
73
def docs_limit_option(f):
74
    """Decorator to add an option for CLI commands to limit the number of documents to
75
    use"""
76
    return click.option(
77
        "--docs-limit",
78
        "-d",
79
        default=None,
80
        type=click.IntRange(0, None),
81
        help="Maximum number of documents to use",
82
    )(f)
83
84
85
def get_project(project_id: str) -> AnnifProject:
86
    """
87
    Helper function to get a project by ID and bail out if it doesn't exist"""
88
    try:
89
        return annif.registry.get_project(project_id, min_access=Access.private)
90
    except ValueError:
91
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
92
        sys.exit(1)
93
94
95
def get_vocab(vocab_id: str) -> AnnifVocabulary:
96
    """
97
    Helper function to get a vocabulary by ID and bail out if it doesn't
98
    exist"""
99
    try:
100
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
101
    except ValueError:
102
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
103
        sys.exit(1)
104
105
106
def make_list_template(*rows) -> str:
107
    """Helper function to create a template for a list of entries with fields of
108
    variable width. The width of each field is determined by the longest item in the
109
    field in the given rows."""
110
111
    max_field_widths = collections.defaultdict(int)
112
    for row in rows:
113
        for field_ind, item in enumerate(row):
114
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
115
116
    return "  ".join(
117
        [
118
            f"{{{field_ind}: <{field_width}}}"
119
            for field_ind, field_width in max_field_widths.items()
120
        ]
121
    )
122
123
124
def format_datetime(dt: datetime | None) -> str:
125
    """Helper function to format a datetime object as a string in the local time."""
126
    if dt is None:
127
        return "-"
128
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
129
130
131
def open_documents(
132
    paths: tuple[str, ...],
133
    subject_index: SubjectIndex,
134
    vocab_lang: str,
135
    docs_limit: int | None,
136
) -> DocumentCorpus:
137
    """Helper function to open a document corpus from a list of pathnames,
138
    each of which is either a TSV or CSV file or a directory of TXT files. For
139
    directories with subjects in TSV files, the given vocabulary language
140
    will be used to convert subject labels into URIs. The corpus will be
141
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
142
143
    def open_doc_path(path, subject_index):
144
        """open a single path and return it as a DocumentCorpus"""
145
        if os.path.isdir(path):
146
            return annif.corpus.DocumentDirectory(
147
                path, subject_index, vocab_lang, require_subjects=True
148
            )
149
        if annif.corpus.DocumentFileCSV.is_csv_file(path):
150
            return annif.corpus.DocumentFileCSV(path, subject_index)
151
        else:
152
            return annif.corpus.DocumentFileTSV(path, subject_index)
153
154
    if len(paths) == 0:
155
        logger.warning("Reading empty file")
156
        docs = open_doc_path(os.path.devnull, subject_index)
157
    elif len(paths) == 1:
158
        docs = open_doc_path(paths[0], subject_index)
159
    else:
160
        corpora = [open_doc_path(path, subject_index) for path in paths]
161
        docs = annif.corpus.CombinedCorpus(corpora)
162
    if docs_limit is not None:
163
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
164
    return docs
165
166
167
def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
168
    """
169
    Helper function to read text documents from the given file paths. Returns a
170
    DocumentList object with Documents having no subjects. If a path is "-", the
171
    document text is read from standard input. The maximum number of documents to read
172
    is set by docs_limit parameter.
173
    """
174
175
    def _docs(paths):
176
        for path in paths:
177
            if path == "-":
178
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
179
            else:
180
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
181
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
182
            yield doc
183
184
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
185
186
187
def show_hits(
188
    hits: SuggestionResult,
189
    project: AnnifProject,
190
    lang: str,
191
    file: io.TextIOWrapper | None = None,
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case TYPE_CHECKING on line 19 is False. Are you sure this can never be the case?
Loading history...
192
) -> None:
193
    """
194
    Print subject suggestions to the console or a file. The suggestions are displayed as
195
    a table, with one row per hit. Each row contains the URI, label, possible notation,
196
    and score of the suggestion. The label is given in the specified language.
197
    """
198
    template = "<{}>\t{}\t{:.04f}"
199
    for hit in hits:
200
        subj = project.subjects[hit.subject_id]
201
        line = template.format(
202
            subj.uri,
203
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
204
            hit.score,
205
        )
206
        click.echo(line, file=file)
207
208
209
def parse_backend_params(
210
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
211
) -> collections.defaultdict[str, dict[str, str]]:
212
    """Parse a list of backend parameters given with the --backend-param
213
    option into a nested dict structure"""
214
    backend_params = collections.defaultdict(dict)
215
    for beparam in backend_param:
216
        backend, param = beparam.split(".", 1)
217
        key, val = param.split("=", 1)
218
        _validate_backend_params(backend, beparam, project)
219
        backend_params[backend][key] = val
220
    return backend_params
221
222
223
def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
224
    if backend != project.config["backend"]:
225
        raise ConfigurationException(
226
            'The backend {} in CLI option "-b {}" not matching the project'
227
            " backend {}.".format(backend, beparam, project.config["backend"])
228
        )
229
230
231
def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]:
232
    """Parse a list of metadata parameters given with the --metadata
233
    option into a dictionary"""
234
235
    metadata_dict = {}
236
    for item in metadata:
237
        if "=" not in item:
238
            raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.")
239
        key, value = item.split("=", 1)
240
        metadata_dict[key] = value
241
242
    return metadata_dict
243
244
245
def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
246
    limits = range(1, filter_batch_max_limit + 1)
247
    thresholds = [i * 0.05 for i in range(20)]
248
    return list(itertools.product(limits, thresholds))
249
250
251
def _get_completion_choices(
252
    param: Argument,
253
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
254
    if param.name in ("project_id", "project_ids_pattern"):
255
        return annif.registry.get_projects()
256
    elif param.name == "vocab_id":
257
        return annif.registry.get_vocabs()
258
    else:
259
        return []
260
261
262
def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
263
    with ctx.obj.load_app().app_context():
264
        return [
265
            choice
266
            for choice in _get_completion_choices(param)
267
            if choice.startswith(incomplete)
268
        ]
269