annif.util.suggestion_to_dict()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif"""
2
3
from __future__ import annotations
4
5
import glob
6
import logging
7
import os
8
import os.path
9
import tempfile
10
from typing import TYPE_CHECKING, Any, Callable
11
12
if TYPE_CHECKING:
13
    from annif.corpus.subject import SubjectIndex
14
    from annif.suggestion import SubjectSuggestion, SuggestionResults
15
16
from annif import logger
17
18
19
class DuplicateFilter(logging.Filter):
20
    """Filter out log messages that have already been displayed."""
21
22
    def __init__(self) -> None:
23
        super().__init__()
24
        self.logged = set()
25
26
    def filter(self, record: logging.LogRecord) -> bool:
27
        current_log = hash((record.module, record.levelno, record.msg, record.args))
28
        if current_log not in self.logged:
29
            self.logged.add(current_log)
30
            return True
31
        return False
32
33
34
def atomic_save(
35
    obj: Any, dirname: str, filename: str, method: Callable | None = None
36
) -> None:
37
    """Save the given object (which must have a .save() method, unless the
38
    method parameter is given) into the given directory with the given
39
    filename, using a temporary file and renaming the temporary file to the
40
    final name."""
41
42
    prefix, suffix = os.path.splitext(filename)
43
    prefix = "tmp-" + prefix
44
    tempfd, tempfilename = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dirname)
45
    os.close(tempfd)
46
    logger.debug("saving %s to temporary file %s", str(obj)[:90], tempfilename)
47
    if method is not None:
48
        method(obj, tempfilename)
49
    else:
50
        obj.save(tempfilename)
51
    for fn in glob.glob(tempfilename + "*"):
52
        newname = fn.replace(tempfilename, os.path.join(dirname, filename))
53
        logger.debug("renaming temporary file %s to %s", fn, newname)
54
        os.rename(fn, newname)
55
        umask = os.umask(0o777)
56
        os.umask(umask)
57
        os.chmod(newname, 0o666 & ~umask)
58
59
60
def cleanup_uri(uri: str) -> str:
61
    """remove angle brackets from a URI, if any"""
62
    if uri.startswith("<") and uri.endswith(">"):
63
        return uri[1:-1]
64
    return uri
65
66
67
def parse_sources(sourcedef: str) -> list[tuple[str, float]]:
68
    """parse a source definition such as 'src1:1.0,src2' into a sequence of
69
    tuples (src_id, weight)"""
70
71
    sources = []
72
    totalweight = 0.0
73
    for srcdef in sourcedef.strip().split(","):
74
        srcval = srcdef.strip().split(":")
75
        src_id = srcval[0]
76
        if len(srcval) > 1:
77
            weight = float(srcval[1])
78
        else:
79
            weight = 1.0
80
        sources.append((src_id, weight))
81
        totalweight += weight
82
    return [(srcid, weight / totalweight) for srcid, weight in sources]
83
84
85
def parse_args(param_string: str) -> tuple[list, dict]:
86
    """Parse a string of comma separated arguments such as '42,43,key=abc' into
87
    a list of positional args [42, 43] and a dict of keyword args {key: abc}"""
88
89
    if not param_string:
90
        return [], {}
91
    posargs = []
92
    kwargs = {}
93
    param_strings = param_string.split(",")
94
    for p_string in param_strings:
95
        parts = p_string.split("=")
96
        if len(parts) == 1:
97
            posargs.append(p_string)
98
        elif len(parts) == 2:
99
            kwargs[parts[0]] = parts[1]
100
    return posargs, kwargs
101
102
103
def boolean(val: Any) -> bool:
104
    """Convert the given value to a boolean True/False value, if it isn't already.
105
    True values are '1', 'yes', 'true', and 'on' (case insensitive), everything
106
    else is False."""
107
108
    return str(val).lower() in ("1", "yes", "true", "on")
109
110
111
def identity(x: Any) -> Any:
112
    """Identity function: return the given argument unchanged"""
113
    return x
114
115
116
def metric_code(metric):
117
    """Convert a human-readable metric name into an alphanumeric string"""
118
    return metric.translate(metric.maketrans(" ", "_", "()"))
119
120
121
def suggestion_to_dict(
122
    suggestion: SubjectSuggestion, subject_index: SubjectIndex, language: str
123
) -> dict[str, str | float | None]:
124
    subject = subject_index[suggestion.subject_id]
125
    return {
126
        "uri": subject.uri,
127
        "label": subject.labels[language],
128
        "notation": subject.notation,
129
        "score": suggestion.score,
130
    }
131
132
133
def suggestion_results_to_list(
134
    suggestion_results: SuggestionResults, subjects: SubjectIndex, lang: str
135
) -> list[dict[str, list]]:
136
    return [
137
        {
138
            "results": [
139
                suggestion_to_dict(suggestion, subjects, lang)
140
                for suggestion in suggestions
141
            ]
142
        }
143
        for suggestions in suggestion_results
144
    ]
145