Passed
Pull Request — main (#681)
by Osma
02:36
created

annif.util   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 106
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 20
eloc 65
dl 0
loc 106
rs 10
c 0
b 0
f 0

7 Functions

Rating   Name   Duplication   Size   Complexity  
A cleanup_uri() 0 5 3
A parse_sources() 0 16 3
A atomic_save() 0 18 3
A boolean() 0 6 1
A metric_code() 0 3 1
A parse_args() 0 16 5
A identity() 0 3 1

2 Methods

Rating   Name   Duplication   Size   Complexity  
A DuplicateFilter.filter() 0 6 2
A DuplicateFilter.__init__() 0 3 1
1
"""Utility functions for Annif"""
2
3
import glob
4
import logging
5
import os
6
import os.path
7
import tempfile
8
9
from annif import logger
10
11
12
class DuplicateFilter(logging.Filter):
13
    """Filter out log messages that have already been displayed."""
14
15
    def __init__(self):
16
        super().__init__()
17
        self.logged = set()
18
19
    def filter(self, record):
20
        current_log = hash((record.module, record.levelno, record.msg, record.args))
21
        if current_log not in self.logged:
22
            self.logged.add(current_log)
23
            return True
24
        return False
25
26
27
def atomic_save(obj, dirname, filename, method=None):
28
    """Save the given object (which must have a .save() method, unless the
29
    method parameter is given) into the given directory with the given
30
    filename, using a temporary file and renaming the temporary file to the
31
    final name."""
32
33
    prefix, suffix = os.path.splitext(filename)
34
    tempfd, tempfilename = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dirname)
35
    os.close(tempfd)
36
    logger.debug("saving %s to temporary file %s", str(obj)[:90], tempfilename)
37
    if method is not None:
38
        method(obj, tempfilename)
39
    else:
40
        obj.save(tempfilename)
41
    for fn in glob.glob(tempfilename + "*"):
42
        newname = fn.replace(tempfilename, os.path.join(dirname, filename))
43
        logger.debug("renaming temporary file %s to %s", fn, newname)
44
        os.rename(fn, newname)
45
46
47
def cleanup_uri(uri):
48
    """remove angle brackets from a URI, if any"""
49
    if uri.startswith("<") and uri.endswith(">"):
50
        return uri[1:-1]
51
    return uri
52
53
54
def parse_sources(sourcedef):
55
    """parse a source definition such as 'src1:1.0,src2' into a sequence of
56
    tuples (src_id, weight)"""
57
58
    sources = []
59
    totalweight = 0.0
60
    for srcdef in sourcedef.strip().split(","):
61
        srcval = srcdef.strip().split(":")
62
        src_id = srcval[0]
63
        if len(srcval) > 1:
64
            weight = float(srcval[1])
65
        else:
66
            weight = 1.0
67
        sources.append((src_id, weight))
68
        totalweight += weight
69
    return [(srcid, weight / totalweight) for srcid, weight in sources]
70
71
72
def parse_args(param_string):
73
    """Parse a string of comma separated arguments such as '42,43,key=abc' into
74
    a list of positional args [42, 43] and a dict of keyword args {key: abc}"""
75
76
    if not param_string:
77
        return [], {}
78
    posargs = []
79
    kwargs = {}
80
    param_strings = param_string.split(",")
81
    for p_string in param_strings:
82
        parts = p_string.split("=")
83
        if len(parts) == 1:
84
            posargs.append(p_string)
85
        elif len(parts) == 2:
86
            kwargs[parts[0]] = parts[1]
87
    return posargs, kwargs
88
89
90
def boolean(val):
91
    """Convert the given value to a boolean True/False value, if it isn't already.
92
    True values are '1', 'yes', 'true', and 'on' (case insensitive), everything
93
    else is False."""
94
95
    return str(val).lower() in ("1", "yes", "true", "on")
96
97
98
def identity(x):
99
    """Identity function: return the given argument unchanged"""
100
    return x
101
102
103
def metric_code(metric):
104
    """Convert a human-readable metric name into an alphanumeric string"""
105
    return metric.translate(metric.maketrans(" ", "_", "()"))
106