annif.util.boolean()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""Utility functions for Annif"""
2
3
from __future__ import annotations
4
5
import glob
6
import logging
7
import os
8
import os.path
9
import tempfile
10
from typing import Any, Callable
11
12
from annif import logger
13
14
15
class DuplicateFilter(logging.Filter):
16
    """Filter out log messages that have already been displayed."""
17
18
    def __init__(self) -> None:
19
        super().__init__()
20
        self.logged = set()
21
22
    def filter(self, record: logging.LogRecord) -> bool:
23
        current_log = hash((record.module, record.levelno, record.msg, record.args))
24
        if current_log not in self.logged:
25
            self.logged.add(current_log)
26
            return True
27
        return False
28
29
30
def atomic_save(
31
    obj: Any, dirname: str, filename: str, method: Callable | None = None
32
) -> None:
33
    """Save the given object (which must have a .save() method, unless the
34
    method parameter is given) into the given directory with the given
35
    filename, using a temporary file and renaming the temporary file to the
36
    final name."""
37
38
    prefix, suffix = os.path.splitext(filename)
39
    prefix = "tmp-" + prefix
40
    tempfd, tempfilename = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dirname)
41
    os.close(tempfd)
42
    logger.debug("saving %s to temporary file %s", str(obj)[:90], tempfilename)
43
    if method is not None:
44
        method(obj, tempfilename)
45
    else:
46
        obj.save(tempfilename)
47
    for fn in glob.glob(tempfilename + "*"):
48
        newname = fn.replace(tempfilename, os.path.join(dirname, filename))
49
        logger.debug("renaming temporary file %s to %s", fn, newname)
50
        os.rename(fn, newname)
51
        umask = os.umask(0o777)
52
        os.umask(umask)
53
        os.chmod(newname, 0o666 & ~umask)
54
55
56
def cleanup_uri(uri: str) -> str:
57
    """remove angle brackets from a URI, if any"""
58
    if uri.startswith("<") and uri.endswith(">"):
59
        return uri[1:-1]
60
    return uri
61
62
63
def parse_sources(sourcedef: str) -> list[tuple[str, float]]:
64
    """parse a source definition such as 'src1:1.0,src2' into a sequence of
65
    tuples (src_id, weight)"""
66
67
    sources = []
68
    totalweight = 0.0
69
    for srcdef in sourcedef.strip().split(","):
70
        srcval = srcdef.strip().split(":")
71
        src_id = srcval[0]
72
        if len(srcval) > 1:
73
            weight = float(srcval[1])
74
        else:
75
            weight = 1.0
76
        sources.append((src_id, weight))
77
        totalweight += weight
78
    return [(srcid, weight / totalweight) for srcid, weight in sources]
79
80
81
def parse_args(param_string: str) -> tuple[list, dict]:
82
    """Parse a string of comma separated arguments such as '42,43,key=abc' into
83
    a list of positional args [42, 43] and a dict of keyword args {key: abc}"""
84
85
    if not param_string:
86
        return [], {}
87
    posargs = []
88
    kwargs = {}
89
    param_strings = param_string.split(",")
90
    for p_string in param_strings:
91
        parts = p_string.split("=")
92
        if len(parts) == 1:
93
            posargs.append(p_string)
94
        elif len(parts) == 2:
95
            kwargs[parts[0]] = parts[1]
96
    return posargs, kwargs
97
98
99
def boolean(val: Any) -> bool:
100
    """Convert the given value to a boolean True/False value, if it isn't already.
101
    True values are '1', 'yes', 'true', and 'on' (case insensitive), everything
102
    else is False."""
103
104
    return str(val).lower() in ("1", "yes", "true", "on")
105
106
107
def identity(x: Any) -> Any:
108
    """Identity function: return the given argument unchanged"""
109
    return x
110
111
112
def metric_code(metric):
113
    """Convert a human-readable metric name into an alphanumeric string"""
114
    return metric.translate(metric.maketrans(" ", "_", "()"))
115