NatLibFi /
Annif
| 1 | """Utility functions for Annif CLI commands""" |
||
| 2 | |||
| 3 | from __future__ import annotations |
||
| 4 | |||
| 5 | import collections |
||
| 6 | import gzip |
||
| 7 | import itertools |
||
| 8 | import os |
||
| 9 | import re |
||
| 10 | import sys |
||
| 11 | from contextlib import nullcontext |
||
| 12 | from typing import TYPE_CHECKING, Optional, TextIO |
||
| 13 | |||
| 14 | import click |
||
| 15 | import click_log |
||
| 16 | from flask import current_app |
||
| 17 | |||
| 18 | import annif |
||
| 19 | from annif.exception import ConfigurationException |
||
| 20 | from annif.project import Access |
||
| 21 | |||
| 22 | if TYPE_CHECKING: |
||
| 23 | import io |
||
| 24 | from datetime import datetime |
||
| 25 | |||
| 26 | from click.core import Argument, Context, Option |
||
| 27 | |||
| 28 | from annif.corpus.document import DocumentCorpus, DocumentList |
||
| 29 | from annif.corpus.subject import SubjectIndex |
||
| 30 | from annif.project import AnnifProject |
||
| 31 | from annif.suggestion import SuggestionResult |
||
| 32 | from annif.vocab import AnnifVocabulary |
||
| 33 | |||
| 34 | logger = annif.logger |
||
| 35 | |||
| 36 | |||
| 37 | def _set_project_config_file_path( |
||
| 38 | ctx: Context, param: Option, value: str | None |
||
| 39 | ) -> None: |
||
| 40 | """Override the default path or the path given in env by CLI option""" |
||
| 41 | with ctx.obj.load_app().app_context(): |
||
| 42 | if value: |
||
| 43 | current_app.config["PROJECTS_CONFIG_PATH"] = value |
||
| 44 | |||
| 45 | |||
| 46 | def common_options(f): |
||
| 47 | """Decorator to add common options for all CLI commands""" |
||
| 48 | f = click.option( |
||
| 49 | "-p", |
||
| 50 | "--projects", |
||
| 51 | help="Set path to project configuration file or directory", |
||
| 52 | type=click.Path(dir_okay=True, exists=True), |
||
| 53 | callback=_set_project_config_file_path, |
||
| 54 | expose_value=False, |
||
| 55 | is_eager=True, |
||
| 56 | )(f) |
||
| 57 | return click_log.simple_verbosity_option(logger)(f) |
||
| 58 | |||
| 59 | |||
| 60 | def project_id(f): |
||
| 61 | """Decorator to add a project ID parameter to a CLI command""" |
||
| 62 | return click.argument("project_id", shell_complete=complete_param)(f) |
||
| 63 | |||
| 64 | |||
| 65 | def backend_param_option(f): |
||
| 66 | """Decorator to add an option for CLI commands to override BE parameters""" |
||
| 67 | return click.option( |
||
| 68 | "--backend-param", |
||
| 69 | "-b", |
||
| 70 | multiple=True, |
||
| 71 | help="Override backend parameter of the config file. " |
||
| 72 | + "Syntax: `-b <backend>.<parameter>=<value>`.", |
||
| 73 | )(f) |
||
| 74 | |||
| 75 | |||
| 76 | def docs_limit_option(f): |
||
| 77 | """Decorator to add an option for CLI commands to limit the number of documents to |
||
| 78 | use""" |
||
| 79 | return click.option( |
||
| 80 | "--docs-limit", |
||
| 81 | "-d", |
||
| 82 | default=None, |
||
| 83 | type=click.IntRange(0, None), |
||
| 84 | help="Maximum number of documents to use", |
||
| 85 | )(f) |
||
| 86 | |||
| 87 | |||
| 88 | def get_project(project_id: str) -> AnnifProject: |
||
| 89 | """ |
||
| 90 | Helper function to get a project by ID and bail out if it doesn't exist""" |
||
| 91 | try: |
||
| 92 | return annif.registry.get_project(project_id, min_access=Access.private) |
||
| 93 | except ValueError: |
||
| 94 | click.echo("No projects found with id '{0}'.".format(project_id), err=True) |
||
| 95 | sys.exit(1) |
||
| 96 | |||
| 97 | |||
| 98 | def get_vocab(vocab_id: str) -> AnnifVocabulary: |
||
| 99 | """ |
||
| 100 | Helper function to get a vocabulary by ID and bail out if it doesn't |
||
| 101 | exist""" |
||
| 102 | try: |
||
| 103 | return annif.registry.get_vocab(vocab_id, min_access=Access.private) |
||
| 104 | except ValueError: |
||
| 105 | click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True) |
||
| 106 | sys.exit(1) |
||
| 107 | |||
| 108 | |||
| 109 | def make_list_template(*rows) -> str: |
||
| 110 | """Helper function to create a template for a list of entries with fields of |
||
| 111 | variable width. The width of each field is determined by the longest item in the |
||
| 112 | field in the given rows.""" |
||
| 113 | |||
| 114 | max_field_widths = collections.defaultdict(int) |
||
| 115 | for row in rows: |
||
| 116 | for field_ind, item in enumerate(row): |
||
| 117 | max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item)) |
||
| 118 | |||
| 119 | return " ".join( |
||
| 120 | [ |
||
| 121 | f"{{{field_ind}: <{field_width}}}" |
||
| 122 | for field_ind, field_width in max_field_widths.items() |
||
| 123 | ] |
||
| 124 | ) |
||
| 125 | |||
| 126 | |||
| 127 | def format_datetime(dt: datetime | None) -> str: |
||
| 128 | """Helper function to format a datetime object as a string in the local time.""" |
||
| 129 | if dt is None: |
||
| 130 | return "-" |
||
| 131 | return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S") |
||
| 132 | |||
| 133 | |||
| 134 | def open_doc_path(path, subject_index, vocab_lang, require_subjects=True): |
||
| 135 | """open a single path and return it as a DocumentCorpus""" |
||
| 136 | if os.path.isdir(path): |
||
| 137 | return annif.corpus.DocumentDirectory( |
||
| 138 | path, subject_index, vocab_lang, require_subjects |
||
| 139 | ) |
||
| 140 | if annif.corpus.DocumentFileCSV.is_csv_file(path): |
||
| 141 | return annif.corpus.DocumentFileCSV(path, subject_index, require_subjects) |
||
| 142 | elif annif.corpus.DocumentFileJSONL.is_jsonl_file(path): |
||
| 143 | return annif.corpus.DocumentFileJSONL( |
||
| 144 | path, subject_index, vocab_lang, require_subjects |
||
| 145 | ) |
||
| 146 | else: |
||
| 147 | return annif.corpus.DocumentFileTSV(path, subject_index, require_subjects) |
||
| 148 | |||
| 149 | |||
| 150 | def open_documents( |
||
| 151 | paths: tuple[str, ...], |
||
| 152 | subject_index: SubjectIndex, |
||
| 153 | vocab_lang: str, |
||
| 154 | docs_limit: int | None, |
||
| 155 | ) -> DocumentCorpus: |
||
| 156 | """Helper function to open a document corpus from a list of pathnames, |
||
| 157 | each of which is either a CSV, TSV or JSONL file or a directory of TXT |
||
| 158 | or JSON files. For corpora with subjects expressed as labels, the given |
||
| 159 | vocabulary language will be used to convert subject labels into URIs. |
||
| 160 | The corpus will be returned as an instance of DocumentCorpus or |
||
| 161 | LimitingDocumentCorpus.""" |
||
| 162 | |||
| 163 | if len(paths) == 0: |
||
| 164 | logger.warning("Reading empty file") |
||
| 165 | docs = open_doc_path(os.path.devnull, subject_index, vocab_lang) |
||
| 166 | elif len(paths) == 1: |
||
| 167 | docs = open_doc_path(paths[0], subject_index, vocab_lang) |
||
| 168 | else: |
||
| 169 | corpora = [open_doc_path(path, subject_index, vocab_lang) for path in paths] |
||
| 170 | docs = annif.corpus.CombinedCorpus(corpora) |
||
| 171 | if docs_limit is not None: |
||
| 172 | docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit) |
||
| 173 | return docs |
||
| 174 | |||
| 175 | |||
| 176 | def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList: |
||
| 177 | """ |
||
| 178 | Helper function to read text documents from the given file paths. Returns a |
||
| 179 | DocumentList object with Documents having no subjects. If a path is "-", the |
||
| 180 | document text is read from standard input. The maximum number of documents to read |
||
| 181 | is set by docs_limit parameter. |
||
| 182 | """ |
||
| 183 | |||
| 184 | def _docs(paths): |
||
| 185 | for path in paths: |
||
| 186 | if path == "-": |
||
| 187 | doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None) |
||
| 188 | else: |
||
| 189 | with open(path, errors="replace", encoding="utf-8-sig") as docfile: |
||
| 190 | doc = annif.corpus.Document(text=docfile.read(), subject_set=None) |
||
| 191 | yield doc |
||
| 192 | |||
| 193 | return annif.corpus.DocumentList(_docs(paths[:docs_limit])) |
||
| 194 | |||
| 195 | |||
| 196 | def get_output_stream( |
||
| 197 | path: str, suffix: str, output: Optional[str], use_gzip: bool, force: bool |
||
| 198 | ) -> Optional[TextIO]: |
||
| 199 | """Return a writable output stream based on the output option.""" |
||
| 200 | |||
| 201 | if output == "-": |
||
| 202 | return nullcontext(sys.stdout) |
||
| 203 | elif output: |
||
| 204 | outfilename = output + ( |
||
| 205 | ".gz" if use_gzip and not output.endswith(".gz") else "" |
||
| 206 | ) |
||
| 207 | else: |
||
| 208 | outfilename = re.sub(r"(\.[^.]+)?(\.gz)?$", "", path) + suffix |
||
| 209 | if use_gzip and not outfilename.endswith(".gz"): |
||
| 210 | outfilename += ".gz" |
||
| 211 | |||
| 212 | if not force and os.path.exists(outfilename): |
||
| 213 | click.echo(f"Not overwriting {outfilename} (use --force to override)") |
||
| 214 | return None |
||
| 215 | |||
| 216 | opener = gzip.open if use_gzip else open |
||
| 217 | return opener(outfilename, "wt", encoding="utf-8") |
||
| 218 | |||
| 219 | |||
| 220 | def show_hits( |
||
| 221 | hits: SuggestionResult, |
||
| 222 | project: AnnifProject, |
||
| 223 | lang: str, |
||
| 224 | file: io.TextIOWrapper | None = None, |
||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
| 225 | ) -> None: |
||
| 226 | """ |
||
| 227 | Print subject suggestions to the console or a file. The suggestions are displayed as |
||
| 228 | a table, with one row per hit. Each row contains the URI, label, possible notation, |
||
| 229 | and score of the suggestion. The label is given in the specified language. |
||
| 230 | """ |
||
| 231 | template = "<{}>\t{}\t{:.04f}" |
||
| 232 | for hit in hits: |
||
| 233 | subj = project.subjects[hit.subject_id] |
||
| 234 | line = template.format( |
||
| 235 | subj.uri, |
||
| 236 | "\t".join(filter(None, (subj.labels[lang], subj.notation))), |
||
| 237 | hit.score, |
||
| 238 | ) |
||
| 239 | click.echo(line, file=file) |
||
| 240 | |||
| 241 | |||
| 242 | def parse_backend_params( |
||
| 243 | backend_param: tuple[str, ...] | tuple[()], project: AnnifProject |
||
| 244 | ) -> collections.defaultdict[str, dict[str, str]]: |
||
| 245 | """Parse a list of backend parameters given with the --backend-param |
||
| 246 | option into a nested dict structure""" |
||
| 247 | backend_params = collections.defaultdict(dict) |
||
| 248 | for beparam in backend_param: |
||
| 249 | backend, param = beparam.split(".", 1) |
||
| 250 | key, val = param.split("=", 1) |
||
| 251 | _validate_backend_params(backend, beparam, project) |
||
| 252 | backend_params[backend][key] = val |
||
| 253 | return backend_params |
||
| 254 | |||
| 255 | |||
| 256 | def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None: |
||
| 257 | if backend != project.config["backend"]: |
||
| 258 | raise ConfigurationException( |
||
| 259 | 'The backend {} in CLI option "-b {}" not matching the project' |
||
| 260 | " backend {}.".format(backend, beparam, project.config["backend"]) |
||
| 261 | ) |
||
| 262 | |||
| 263 | |||
| 264 | def parse_metadata(metadata: tuple[str, ...] | tuple[()]) -> dict[str, str]: |
||
| 265 | """Parse a list of metadata parameters given with the --metadata |
||
| 266 | option into a dictionary""" |
||
| 267 | |||
| 268 | metadata_dict = {} |
||
| 269 | for item in metadata: |
||
| 270 | if "=" not in item: |
||
| 271 | raise click.BadParameter(f"--metadata '{item}'. Expected <key>=<value>.") |
||
| 272 | key, value = item.split("=", 1) |
||
| 273 | metadata_dict[key] = value |
||
| 274 | |||
| 275 | return metadata_dict |
||
| 276 | |||
| 277 | |||
| 278 | def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]: |
||
| 279 | limits = range(1, filter_batch_max_limit + 1) |
||
| 280 | thresholds = [i * 0.05 for i in range(20)] |
||
| 281 | return list(itertools.product(limits, thresholds)) |
||
| 282 | |||
| 283 | |||
| 284 | def _get_completion_choices( |
||
| 285 | param: Argument, |
||
| 286 | ) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list: |
||
| 287 | if param.name in ("project_id", "project_ids_pattern"): |
||
| 288 | return annif.registry.get_projects() |
||
| 289 | elif param.name == "vocab_id": |
||
| 290 | return annif.registry.get_vocabs() |
||
| 291 | else: |
||
| 292 | return [] |
||
| 293 | |||
| 294 | |||
| 295 | def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]: |
||
| 296 | with ctx.obj.load_app().app_context(): |
||
| 297 | return [ |
||
| 298 | choice |
||
| 299 | for choice in _get_completion_choices(param) |
||
| 300 | if choice.startswith(incomplete) |
||
| 301 | ] |
||
| 302 |