Passed
Push — issue686-cli-command-list-proj... ( 267ee5...0cc9fe )
by Juho
05:55 queued 03:04
created

annif.cli_util.project_id()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif CLI commands"""
2
3
4
import collections
5
import itertools
6
import os
7
import sys
8
9
import click
10
import click_log
11
from flask import current_app
12
13
import annif
14
from annif.exception import ConfigurationException
15
from annif.project import Access
16
17
logger = annif.logger
18
19
20
def _set_project_config_file_path(ctx, param, value):
21
    """Override the default path or the path given in env by CLI option"""
22
    with ctx.obj.load_app().app_context():
23
        if value:
24
            current_app.config["PROJECTS_CONFIG_PATH"] = value
25
26
27
def common_options(f):
28
    """Decorator to add common options for all CLI commands"""
29
    f = click.option(
30
        "-p",
31
        "--projects",
32
        help="Set path to project configuration file or directory",
33
        type=click.Path(dir_okay=True, exists=True),
34
        callback=_set_project_config_file_path,
35
        expose_value=False,
36
        is_eager=True,
37
    )(f)
38
    return click_log.simple_verbosity_option(logger)(f)
39
40
41
def project_id(f):
42
    """Decorator to add a project ID parameter to a CLI command"""
43
    return click.argument("project_id", shell_complete=complete_param)(f)
44
45
46
def backend_param_option(f):
47
    """Decorator to add an option for CLI commands to override BE parameters"""
48
    return click.option(
49
        "--backend-param",
50
        "-b",
51
        multiple=True,
52
        help="Override backend parameter of the config file. "
53
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
54
    )(f)
55
56
57
def docs_limit_option(f):
58
    """Decorator to add an option for CLI commands to limit the number of documents to
59
    use"""
60
    return click.option(
61
        "--docs-limit",
62
        "-d",
63
        default=None,
64
        type=click.IntRange(0, None),
65
        help="Maximum number of documents to use",
66
    )(f)
67
68
69
def get_project(project_id):
70
    """
71
    Helper function to get a project by ID and bail out if it doesn't exist"""
72
    try:
73
        return annif.registry.get_project(project_id, min_access=Access.private)
74
    except ValueError:
75
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
76
        sys.exit(1)
77
78
79
def get_vocab(vocab_id):
80
    """
81
    Helper function to get a vocabulary by ID and bail out if it doesn't
82
    exist"""
83
    try:
84
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
85
    except ValueError:
86
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
87
        sys.exit(1)
88
89
90
def make_list_template(*rows):
91
    """Helper function to create a template for a list of entries with fields of
92
    variable width. The width of each field is determined by the longest item in the
93
    field in the given rows."""
94
95
    max_field_widths = collections.defaultdict(int)
96
    for row in rows:
97
        for field_ind, item in enumerate(row):
98
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))
99
100
    return "  ".join(
101
        [
102
            f"{{{field_ind}: <{field_width}}}"
103
            for field_ind, field_width in max_field_widths.items()
104
        ]
105
    )
106
107
108
def format_datetime(dt):
109
    """Helper function to format a datetime object as a string in the local time."""
110
    if dt is None:
111
        return "-"
112
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
113
114
115
def open_documents(paths, subject_index, vocab_lang, docs_limit):
116
    """Helper function to open a document corpus from a list of pathnames,
117
    each of which is either a TSV file or a directory of TXT files. For
118
    directories with subjects in TSV files, the given vocabulary language
119
    will be used to convert subject labels into URIs. The corpus will be
120
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""
121
122
    def open_doc_path(path, subject_index):
123
        """open a single path and return it as a DocumentCorpus"""
124
        if os.path.isdir(path):
125
            return annif.corpus.DocumentDirectory(
126
                path, subject_index, vocab_lang, require_subjects=True
127
            )
128
        return annif.corpus.DocumentFile(path, subject_index)
129
130
    if len(paths) == 0:
131
        logger.warning("Reading empty file")
132
        docs = open_doc_path(os.path.devnull, subject_index)
133
    elif len(paths) == 1:
134
        docs = open_doc_path(paths[0], subject_index)
135
    else:
136
        corpora = [open_doc_path(path, subject_index) for path in paths]
137
        docs = annif.corpus.CombinedCorpus(corpora)
138
    if docs_limit is not None:
139
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
140
    return docs
141
142
143
def open_text_documents(paths, docs_limit):
144
    """
145
    Helper function to read text documents from the given file paths. Returns a
146
    DocumentList object with Documents having no subjects. If a path is "-", the
147
    document text is read from standard input. The maximum number of documents to read
148
    is set by docs_limit parameter.
149
    """
150
151
    def _docs(paths):
152
        for path in paths:
153
            if path == "-":
154
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
155
            else:
156
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
157
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
158
            yield doc
159
160
    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))
161
162
163
def show_hits(hits, project, lang, file=None):
164
    """
165
    Print subject suggestions to the console or a file. The suggestions are displayed as
166
    a table, with one row per hit. Each row contains the URI, label, possible notation,
167
    and score of the suggestion. The label is given in the specified language.
168
    """
169
    for hit in hits:
170
        subj = project.subjects[hit.subject_id]
171
        line = "<{}>\t{}\t{}".format(
172
            subj.uri,
173
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
174
            hit.score,
175
        )
176
        click.echo(line, file=file)
177
178
179
def parse_backend_params(backend_param, project):
180
    """Parse a list of backend parameters given with the --backend-param
181
    option into a nested dict structure"""
182
    backend_params = collections.defaultdict(dict)
183
    for beparam in backend_param:
184
        backend, param = beparam.split(".", 1)
185
        key, val = param.split("=", 1)
186
        _validate_backend_params(backend, beparam, project)
187
        backend_params[backend][key] = val
188
    return backend_params
189
190
191
def _validate_backend_params(backend, beparam, project):
192
    if backend != project.config["backend"]:
193
        raise ConfigurationException(
194
            'The backend {} in CLI option "-b {}" not matching the project'
195
            " backend {}.".format(backend, beparam, project.config["backend"])
196
        )
197
198
199
def generate_filter_params(filter_batch_max_limit):
200
    limits = range(1, filter_batch_max_limit + 1)
201
    thresholds = [i * 0.05 for i in range(20)]
202
    return list(itertools.product(limits, thresholds))
203
204
205
def _get_completion_choices(param):
206
    if param.name == "project_id":
207
        return annif.registry.get_projects()
208
    elif param.name == "vocab_id":
209
        return annif.registry.get_vocabs()
210
    else:
211
        return []
212
213
214
def complete_param(ctx, param, incomplete):
215
    with ctx.obj.load_app().app_context():
216
        return [
217
            choice
218
            for choice in _get_completion_choices(param)
219
            if choice.startswith(incomplete)
220
        ]
221