1 | """Utility functions for Annif CLI commands""" |
||
2 | |||
3 | from __future__ import annotations |
||
4 | |||
5 | import collections |
||
6 | import itertools |
||
7 | import os |
||
8 | import sys |
||
9 | from typing import TYPE_CHECKING |
||
10 | |||
11 | import click |
||
12 | import click_log |
||
13 | from flask import current_app |
||
14 | |||
15 | import annif |
||
16 | from annif.exception import ConfigurationException |
||
17 | from annif.project import Access |
||
18 | |||
19 | if TYPE_CHECKING: |
||
20 | import io |
||
21 | from datetime import datetime |
||
22 | |||
23 | from click.core import Argument, Context, Option |
||
24 | |||
25 | from annif.corpus.document import DocumentCorpus, DocumentList |
||
26 | from annif.corpus.subject import SubjectIndex |
||
27 | from annif.project import AnnifProject |
||
28 | from annif.suggestion import SuggestionResult |
||
29 | from annif.vocab import AnnifVocabulary |
||
30 | |||
31 | logger = annif.logger |
||
32 | |||
33 | |||
34 | def _set_project_config_file_path( |
||
35 | ctx: Context, param: Option, value: str | None |
||
36 | ) -> None: |
||
37 | """Override the default path or the path given in env by CLI option""" |
||
38 | with ctx.obj.load_app().app_context(): |
||
39 | if value: |
||
40 | current_app.config["PROJECTS_CONFIG_PATH"] = value |
||
41 | |||
42 | |||
43 | def common_options(f): |
||
44 | """Decorator to add common options for all CLI commands""" |
||
45 | f = click.option( |
||
46 | "-p", |
||
47 | "--projects", |
||
48 | help="Set path to project configuration file or directory", |
||
49 | type=click.Path(dir_okay=True, exists=True), |
||
50 | callback=_set_project_config_file_path, |
||
51 | expose_value=False, |
||
52 | is_eager=True, |
||
53 | )(f) |
||
54 | return click_log.simple_verbosity_option(logger)(f) |
||
55 | |||
56 | |||
57 | def project_id(f): |
||
58 | """Decorator to add a project ID parameter to a CLI command""" |
||
59 | return click.argument("project_id", shell_complete=complete_param)(f) |
||
60 | |||
61 | |||
62 | def backend_param_option(f): |
||
63 | """Decorator to add an option for CLI commands to override BE parameters""" |
||
64 | return click.option( |
||
65 | "--backend-param", |
||
66 | "-b", |
||
67 | multiple=True, |
||
68 | help="Override backend parameter of the config file. " |
||
69 | + "Syntax: `-b <backend>.<parameter>=<value>`.", |
||
70 | )(f) |
||
71 | |||
72 | |||
73 | def docs_limit_option(f): |
||
74 | """Decorator to add an option for CLI commands to limit the number of documents to |
||
75 | use""" |
||
76 | return click.option( |
||
77 | "--docs-limit", |
||
78 | "-d", |
||
79 | default=None, |
||
80 | type=click.IntRange(0, None), |
||
81 | help="Maximum number of documents to use", |
||
82 | )(f) |
||
83 | |||
84 | |||
85 | def get_project(project_id: str) -> AnnifProject: |
||
86 | """ |
||
87 | Helper function to get a project by ID and bail out if it doesn't exist""" |
||
88 | try: |
||
89 | return annif.registry.get_project(project_id, min_access=Access.private) |
||
90 | except ValueError: |
||
91 | click.echo("No projects found with id '{0}'.".format(project_id), err=True) |
||
92 | sys.exit(1) |
||
93 | |||
94 | |||
95 | def get_vocab(vocab_id: str) -> AnnifVocabulary: |
||
96 | """ |
||
97 | Helper function to get a vocabulary by ID and bail out if it doesn't |
||
98 | exist""" |
||
99 | try: |
||
100 | return annif.registry.get_vocab(vocab_id, min_access=Access.private) |
||
101 | except ValueError: |
||
102 | click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True) |
||
103 | sys.exit(1) |
||
104 | |||
105 | |||
106 | def make_list_template(*rows) -> str: |
||
107 | """Helper function to create a template for a list of entries with fields of |
||
108 | variable width. The width of each field is determined by the longest item in the |
||
109 | field in the given rows.""" |
||
110 | |||
111 | max_field_widths = collections.defaultdict(int) |
||
112 | for row in rows: |
||
113 | for field_ind, item in enumerate(row): |
||
114 | max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item)) |
||
115 | |||
116 | return " ".join( |
||
117 | [ |
||
118 | f"{{{field_ind}: <{field_width}}}" |
||
119 | for field_ind, field_width in max_field_widths.items() |
||
120 | ] |
||
121 | ) |
||
122 | |||
123 | |||
124 | def format_datetime(dt: datetime | None) -> str: |
||
125 | """Helper function to format a datetime object as a string in the local time.""" |
||
126 | if dt is None: |
||
127 | return "-" |
||
128 | return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S") |
||
129 | |||
130 | |||
131 | def open_documents( |
||
132 | paths: tuple[str, ...], |
||
133 | subject_index: SubjectIndex, |
||
134 | vocab_lang: str, |
||
135 | docs_limit: int | None, |
||
136 | ) -> DocumentCorpus: |
||
137 | """Helper function to open a document corpus from a list of pathnames, |
||
138 | each of which is either a TSV or CSV file or a directory of TXT files. For |
||
139 | directories with subjects in TSV files, the given vocabulary language |
||
140 | will be used to convert subject labels into URIs. The corpus will be |
||
141 | returned as an instance of DocumentCorpus or LimitingDocumentCorpus.""" |
||
142 | |||
143 | def open_doc_path(path, subject_index): |
||
144 | """open a single path and return it as a DocumentCorpus""" |
||
145 | if os.path.isdir(path): |
||
146 | return annif.corpus.DocumentDirectory( |
||
147 | path, subject_index, vocab_lang, require_subjects=True |
||
148 | ) |
||
149 | if annif.corpus.DocumentFileCSV.is_csv_file(path): |
||
150 | return annif.corpus.DocumentFileCSV(path, subject_index) |
||
151 | else: |
||
152 | return annif.corpus.DocumentFileTSV(path, subject_index) |
||
153 | |||
154 | if len(paths) == 0: |
||
155 | logger.warning("Reading empty file") |
||
156 | docs = open_doc_path(os.path.devnull, subject_index) |
||
157 | elif len(paths) == 1: |
||
158 | docs = open_doc_path(paths[0], subject_index) |
||
159 | else: |
||
160 | corpora = [open_doc_path(path, subject_index) for path in paths] |
||
161 | docs = annif.corpus.CombinedCorpus(corpora) |
||
162 | if docs_limit is not None: |
||
163 | docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit) |
||
164 | return docs |
||
165 | |||
166 | |||
167 | def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList: |
||
168 | """ |
||
169 | Helper function to read text documents from the given file paths. Returns a |
||
170 | DocumentList object with Documents having no subjects. If a path is "-", the |
||
171 | document text is read from standard input. The maximum number of documents to read |
||
172 | is set by docs_limit parameter. |
||
173 | """ |
||
174 | |||
175 | def _docs(paths): |
||
176 | for path in paths: |
||
177 | if path == "-": |
||
178 | doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None) |
||
179 | else: |
||
180 | with open(path, errors="replace", encoding="utf-8-sig") as docfile: |
||
181 | doc = annif.corpus.Document(text=docfile.read(), subject_set=None) |
||
182 | yield doc |
||
183 | |||
184 | return annif.corpus.DocumentList(_docs(paths[:docs_limit])) |
||
185 | |||
186 | |||
187 | def show_hits( |
||
188 | hits: SuggestionResult, |
||
189 | project: AnnifProject, |
||
190 | lang: str, |
||
191 | file: io.TextIOWrapper | None = None, |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
192 | ) -> None: |
||
193 | """ |
||
194 | Print subject suggestions to the console or a file. The suggestions are displayed as |
||
195 | a table, with one row per hit. Each row contains the URI, label, possible notation, |
||
196 | and score of the suggestion. The label is given in the specified language. |
||
197 | """ |
||
198 | template = "<{}>\t{}\t{:.04f}" |
||
199 | for hit in hits: |
||
200 | subj = project.subjects[hit.subject_id] |
||
201 | line = template.format( |
||
202 | subj.uri, |
||
203 | "\t".join(filter(None, (subj.labels[lang], subj.notation))), |
||
204 | hit.score, |
||
205 | ) |
||
206 | click.echo(line, file=file) |
||
207 | |||
208 | |||
209 | def parse_backend_params( |
||
210 | backend_param: tuple[str, ...] | tuple[()], project: AnnifProject |
||
211 | ) -> collections.defaultdict[str, dict[str, str]]: |
||
212 | """Parse a list of backend parameters given with the --backend-param |
||
213 | option into a nested dict structure""" |
||
214 | backend_params = collections.defaultdict(dict) |
||
215 | for beparam in backend_param: |
||
216 | backend, param = beparam.split(".", 1) |
||
217 | key, val = param.split("=", 1) |
||
218 | _validate_backend_params(backend, beparam, project) |
||
219 | backend_params[backend][key] = val |
||
220 | return backend_params |
||
221 | |||
222 | |||
223 | def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None: |
||
224 | if backend != project.config["backend"]: |
||
225 | raise ConfigurationException( |
||
226 | 'The backend {} in CLI option "-b {}" not matching the project' |
||
227 | " backend {}.".format(backend, beparam, project.config["backend"]) |
||
228 | ) |
||
229 | |||
230 | |||
231 | def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]: |
||
232 | """Parse a list of metadata parameters given with the --metadata |
||
233 | option into a dictionary""" |
||
234 | |||
235 | metadata_dict = {} |
||
236 | for item in metadata: |
||
237 | if "=" not in item: |
||
238 | raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.") |
||
239 | key, value = item.split("=", 1) |
||
240 | metadata_dict[key] = value |
||
241 | |||
242 | return metadata_dict |
||
243 | |||
244 | |||
245 | def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]: |
||
246 | limits = range(1, filter_batch_max_limit + 1) |
||
247 | thresholds = [i * 0.05 for i in range(20)] |
||
248 | return list(itertools.product(limits, thresholds)) |
||
249 | |||
250 | |||
251 | def _get_completion_choices( |
||
252 | param: Argument, |
||
253 | ) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list: |
||
254 | if param.name in ("project_id", "project_ids_pattern"): |
||
255 | return annif.registry.get_projects() |
||
256 | elif param.name == "vocab_id": |
||
257 | return annif.registry.get_vocabs() |
||
258 | else: |
||
259 | return [] |
||
260 | |||
261 | |||
262 | def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]: |
||
263 | with ctx.obj.load_app().app_context(): |
||
264 | return [ |
||
265 | choice |
||
266 | for choice in _get_completion_choices(param) |
||
267 | if choice.startswith(incomplete) |
||
268 | ] |
||
269 |