1 | """Utility functions for Annif CLI commands""" |
||
2 | |||
3 | from __future__ import annotations |
||
4 | |||
5 | import collections |
||
6 | import gzip |
||
7 | import itertools |
||
8 | import os |
||
9 | import re |
||
10 | import sys |
||
11 | from contextlib import nullcontext |
||
12 | from typing import TYPE_CHECKING, Optional, TextIO |
||
13 | |||
14 | import click |
||
15 | import click_log |
||
16 | from flask import current_app |
||
17 | |||
18 | import annif |
||
19 | from annif.exception import ConfigurationException |
||
20 | from annif.project import Access |
||
21 | |||
22 | if TYPE_CHECKING: |
||
23 | import io |
||
24 | from datetime import datetime |
||
25 | |||
26 | from click.core import Argument, Context, Option |
||
27 | |||
28 | from annif.corpus.document import DocumentCorpus, DocumentList |
||
29 | from annif.corpus.subject import SubjectIndex |
||
30 | from annif.project import AnnifProject |
||
31 | from annif.suggestion import SuggestionResult |
||
32 | from annif.vocab import AnnifVocabulary |
||
33 | |||
34 | logger = annif.logger |
||
35 | |||
36 | |||
37 | def _set_project_config_file_path( |
||
38 | ctx: Context, param: Option, value: str | None |
||
39 | ) -> None: |
||
40 | """Override the default path or the path given in env by CLI option""" |
||
41 | with ctx.obj.load_app().app_context(): |
||
42 | if value: |
||
43 | current_app.config["PROJECTS_CONFIG_PATH"] = value |
||
44 | |||
45 | |||
46 | def common_options(f): |
||
47 | """Decorator to add common options for all CLI commands""" |
||
48 | f = click.option( |
||
49 | "-p", |
||
50 | "--projects", |
||
51 | help="Set path to project configuration file or directory", |
||
52 | type=click.Path(dir_okay=True, exists=True), |
||
53 | callback=_set_project_config_file_path, |
||
54 | expose_value=False, |
||
55 | is_eager=True, |
||
56 | )(f) |
||
57 | return click_log.simple_verbosity_option(logger)(f) |
||
58 | |||
59 | |||
60 | def project_id(f): |
||
61 | """Decorator to add a project ID parameter to a CLI command""" |
||
62 | return click.argument("project_id", shell_complete=complete_param)(f) |
||
63 | |||
64 | |||
65 | def backend_param_option(f): |
||
66 | """Decorator to add an option for CLI commands to override BE parameters""" |
||
67 | return click.option( |
||
68 | "--backend-param", |
||
69 | "-b", |
||
70 | multiple=True, |
||
71 | help="Override backend parameter of the config file. " |
||
72 | + "Syntax: `-b <backend>.<parameter>=<value>`.", |
||
73 | )(f) |
||
74 | |||
75 | |||
76 | def docs_limit_option(f): |
||
77 | """Decorator to add an option for CLI commands to limit the number of documents to |
||
78 | use""" |
||
79 | return click.option( |
||
80 | "--docs-limit", |
||
81 | "-d", |
||
82 | default=None, |
||
83 | type=click.IntRange(0, None), |
||
84 | help="Maximum number of documents to use", |
||
85 | )(f) |
||
86 | |||
87 | |||
88 | def get_project(project_id: str) -> AnnifProject: |
||
89 | """ |
||
90 | Helper function to get a project by ID and bail out if it doesn't exist""" |
||
91 | try: |
||
92 | return annif.registry.get_project(project_id, min_access=Access.private) |
||
93 | except ValueError: |
||
94 | click.echo("No projects found with id '{0}'.".format(project_id), err=True) |
||
95 | sys.exit(1) |
||
96 | |||
97 | |||
98 | def get_vocab(vocab_id: str) -> AnnifVocabulary: |
||
99 | """ |
||
100 | Helper function to get a vocabulary by ID and bail out if it doesn't |
||
101 | exist""" |
||
102 | try: |
||
103 | return annif.registry.get_vocab(vocab_id, min_access=Access.private) |
||
104 | except ValueError: |
||
105 | click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True) |
||
106 | sys.exit(1) |
||
107 | |||
108 | |||
109 | def make_list_template(*rows) -> str: |
||
110 | """Helper function to create a template for a list of entries with fields of |
||
111 | variable width. The width of each field is determined by the longest item in the |
||
112 | field in the given rows.""" |
||
113 | |||
114 | max_field_widths = collections.defaultdict(int) |
||
115 | for row in rows: |
||
116 | for field_ind, item in enumerate(row): |
||
117 | max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item)) |
||
118 | |||
119 | return " ".join( |
||
120 | [ |
||
121 | f"{{{field_ind}: <{field_width}}}" |
||
122 | for field_ind, field_width in max_field_widths.items() |
||
123 | ] |
||
124 | ) |
||
125 | |||
126 | |||
127 | def format_datetime(dt: datetime | None) -> str: |
||
128 | """Helper function to format a datetime object as a string in the local time.""" |
||
129 | if dt is None: |
||
130 | return "-" |
||
131 | return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S") |
||
132 | |||
133 | |||
134 | def open_doc_path(path, subject_index, vocab_lang, require_subjects=True): |
||
135 | """open a single path and return it as a DocumentCorpus""" |
||
136 | if os.path.isdir(path): |
||
137 | return annif.corpus.DocumentDirectory( |
||
138 | path, subject_index, vocab_lang, require_subjects |
||
139 | ) |
||
140 | if annif.corpus.DocumentFileCSV.is_csv_file(path): |
||
141 | return annif.corpus.DocumentFileCSV(path, subject_index, require_subjects) |
||
142 | elif annif.corpus.DocumentFileJSONL.is_jsonl_file(path): |
||
143 | return annif.corpus.DocumentFileJSONL( |
||
144 | path, subject_index, vocab_lang, require_subjects |
||
145 | ) |
||
146 | else: |
||
147 | return annif.corpus.DocumentFileTSV(path, subject_index, require_subjects) |
||
148 | |||
149 | |||
150 | def open_documents( |
||
151 | paths: tuple[str, ...], |
||
152 | subject_index: SubjectIndex, |
||
153 | vocab_lang: str, |
||
154 | docs_limit: int | None, |
||
155 | ) -> DocumentCorpus: |
||
156 | """Helper function to open a document corpus from a list of pathnames, |
||
157 | each of which is either a CSV, TSV or JSONL file or a directory of TXT |
||
158 | or JSON files. For corpora with subjects expressed as labels, the given |
||
159 | vocabulary language will be used to convert subject labels into URIs. |
||
160 | The corpus will be returned as an instance of DocumentCorpus or |
||
161 | LimitingDocumentCorpus.""" |
||
162 | |||
163 | if len(paths) == 0: |
||
164 | logger.warning("Reading empty file") |
||
165 | docs = open_doc_path(os.path.devnull, subject_index, vocab_lang) |
||
166 | elif len(paths) == 1: |
||
167 | docs = open_doc_path(paths[0], subject_index, vocab_lang) |
||
168 | else: |
||
169 | corpora = [open_doc_path(path, subject_index, vocab_lang) for path in paths] |
||
170 | docs = annif.corpus.CombinedCorpus(corpora) |
||
171 | if docs_limit is not None: |
||
172 | docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit) |
||
173 | return docs |
||
174 | |||
175 | |||
176 | def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList: |
||
177 | """ |
||
178 | Helper function to read text documents from the given file paths. Returns a |
||
179 | DocumentList object with Documents having no subjects. If a path is "-", the |
||
180 | document text is read from standard input. The maximum number of documents to read |
||
181 | is set by docs_limit parameter. |
||
182 | """ |
||
183 | |||
184 | def _docs(paths): |
||
185 | for path in paths: |
||
186 | if path == "-": |
||
187 | doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None) |
||
188 | else: |
||
189 | with open(path, errors="replace", encoding="utf-8-sig") as docfile: |
||
190 | doc = annif.corpus.Document(text=docfile.read(), subject_set=None) |
||
191 | yield doc |
||
192 | |||
193 | return annif.corpus.DocumentList(_docs(paths[:docs_limit])) |
||
194 | |||
195 | |||
196 | def get_output_stream( |
||
197 | path: str, suffix: str, output: Optional[str], use_gzip: bool, force: bool |
||
198 | ) -> Optional[TextIO]: |
||
199 | """Return a writable output stream based on the output option.""" |
||
200 | |||
201 | if output == "-": |
||
202 | return nullcontext(sys.stdout) |
||
203 | elif output: |
||
204 | outfilename = output + ( |
||
205 | ".gz" if use_gzip and not output.endswith(".gz") else "" |
||
206 | ) |
||
207 | else: |
||
208 | outfilename = re.sub(r"(\.[^.]+)?(\.gz)?$", "", path) + suffix |
||
209 | if use_gzip and not outfilename.endswith(".gz"): |
||
210 | outfilename += ".gz" |
||
211 | |||
212 | if not force and os.path.exists(outfilename): |
||
213 | click.echo(f"Not overwriting {outfilename} (use --force to override)") |
||
214 | return None |
||
215 | |||
216 | opener = gzip.open if use_gzip else open |
||
217 | return opener(outfilename, "wt", encoding="utf-8") |
||
218 | |||
219 | |||
220 | def show_hits( |
||
221 | hits: SuggestionResult, |
||
222 | project: AnnifProject, |
||
223 | lang: str, |
||
224 | file: io.TextIOWrapper | None = None, |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
225 | ) -> None: |
||
226 | """ |
||
227 | Print subject suggestions to the console or a file. The suggestions are displayed as |
||
228 | a table, with one row per hit. Each row contains the URI, label, possible notation, |
||
229 | and score of the suggestion. The label is given in the specified language. |
||
230 | """ |
||
231 | template = "<{}>\t{}\t{:.04f}" |
||
232 | for hit in hits: |
||
233 | subj = project.subjects[hit.subject_id] |
||
234 | line = template.format( |
||
235 | subj.uri, |
||
236 | "\t".join(filter(None, (subj.labels[lang], subj.notation))), |
||
237 | hit.score, |
||
238 | ) |
||
239 | click.echo(line, file=file) |
||
240 | |||
241 | |||
242 | def parse_backend_params( |
||
243 | backend_param: tuple[str, ...] | tuple[()], project: AnnifProject |
||
244 | ) -> collections.defaultdict[str, dict[str, str]]: |
||
245 | """Parse a list of backend parameters given with the --backend-param |
||
246 | option into a nested dict structure""" |
||
247 | backend_params = collections.defaultdict(dict) |
||
248 | for beparam in backend_param: |
||
249 | backend, param = beparam.split(".", 1) |
||
250 | key, val = param.split("=", 1) |
||
251 | _validate_backend_params(backend, beparam, project) |
||
252 | backend_params[backend][key] = val |
||
253 | return backend_params |
||
254 | |||
255 | |||
256 | def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None: |
||
257 | if backend != project.config["backend"]: |
||
258 | raise ConfigurationException( |
||
259 | 'The backend {} in CLI option "-b {}" not matching the project' |
||
260 | " backend {}.".format(backend, beparam, project.config["backend"]) |
||
261 | ) |
||
262 | |||
263 | |||
264 | def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]: |
||
265 | """Parse a list of metadata parameters given with the --metadata |
||
266 | option into a dictionary""" |
||
267 | |||
268 | metadata_dict = {} |
||
269 | for item in metadata: |
||
270 | if "=" not in item: |
||
271 | raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.") |
||
272 | key, value = item.split("=", 1) |
||
273 | metadata_dict[key] = value |
||
274 | |||
275 | return metadata_dict |
||
276 | |||
277 | |||
278 | def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]: |
||
279 | limits = range(1, filter_batch_max_limit + 1) |
||
280 | thresholds = [i * 0.05 for i in range(20)] |
||
281 | return list(itertools.product(limits, thresholds)) |
||
282 | |||
283 | |||
284 | def _get_completion_choices( |
||
285 | param: Argument, |
||
286 | ) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list: |
||
287 | if param.name in ("project_id", "project_ids_pattern"): |
||
288 | return annif.registry.get_projects() |
||
289 | elif param.name == "vocab_id": |
||
290 | return annif.registry.get_vocabs() |
||
291 | else: |
||
292 | return [] |
||
293 | |||
294 | |||
295 | def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]: |
||
296 | with ctx.obj.load_app().app_context(): |
||
297 | return [ |
||
298 | choice |
||
299 | for choice in _get_completion_choices(param) |
||
300 | if choice.startswith(incomplete) |
||
301 | ] |
||
302 |