Passed
Pull Request — master (#673)
by Juho
02:50
created

annif.util.DuplicateFilter.filter()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif"""
2
3
import glob
4
import logging
5
import os
6
import os.path
7
import tempfile
8
9
import numpy as np
10
11
from annif import logger
12
from annif.suggestion import VectorSuggestionResult
13
14
15
class DuplicateFilter(logging.Filter):
16
    """Filter out log messages that have already been displayed."""
17
18
    def __init__(self):
19
        super().__init__()
20
        self.logged = set()
21
22
    def filter(self, record):
23
        current_log = hash((record.module, record.levelno, record.msg, record.args))
24
        if current_log not in self.logged:
25
            self.logged.add(current_log)
26
            return True
27
        return False
28
29
30
def atomic_save(obj, dirname, filename, method=None):
31
    """Save the given object (which must have a .save() method, unless the
32
    method parameter is given) into the given directory with the given
33
    filename, using a temporary file and renaming the temporary file to the
34
    final name."""
35
36
    prefix, suffix = os.path.splitext(filename)
37
    tempfd, tempfilename = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dirname)
38
    os.close(tempfd)
39
    logger.debug("saving %s to temporary file %s", str(obj)[:90], tempfilename)
40
    if method is not None:
41
        method(obj, tempfilename)
42
    else:
43
        obj.save(tempfilename)
44
    for fn in glob.glob(tempfilename + "*"):
45
        newname = fn.replace(tempfilename, os.path.join(dirname, filename))
46
        logger.debug("renaming temporary file %s to %s", fn, newname)
47
        os.rename(fn, newname)
48
49
50
def cleanup_uri(uri):
51
    """remove angle brackets from a URI, if any"""
52
    if uri.startswith("<") and uri.endswith(">"):
53
        return uri[1:-1]
54
    return uri
55
56
57
def merge_hits(weighted_hits, size):
58
    """Merge hits from multiple sources. Input is a sequence of WeightedSuggestion
59
    objects. The size parameter determines the length of the subject vector.
60
    Returns an SuggestionResult object."""
61
62
    weights = [whit.weight for whit in weighted_hits]
63
    scores = [whit.hits.as_vector(size) for whit in weighted_hits]
64
    result = np.average(scores, axis=0, weights=weights)
65
    return VectorSuggestionResult(result)
66
67
68
def parse_sources(sourcedef):
69
    """parse a source definition such as 'src1:1.0,src2' into a sequence of
70
    tuples (src_id, weight)"""
71
72
    sources = []
73
    totalweight = 0.0
74
    for srcdef in sourcedef.strip().split(","):
75
        srcval = srcdef.strip().split(":")
76
        src_id = srcval[0]
77
        if len(srcval) > 1:
78
            weight = float(srcval[1])
79
        else:
80
            weight = 1.0
81
        sources.append((src_id, weight))
82
        totalweight += weight
83
    return [(srcid, weight / totalweight) for srcid, weight in sources]
84
85
86
def parse_args(param_string):
87
    """Parse a string of comma separated arguments such as '42,43,key=abc' into
88
    a list of positional args [42, 43] and a dict of keyword args {key: abc}"""
89
90
    if not param_string:
91
        return [], {}
92
    posargs = []
93
    kwargs = {}
94
    param_strings = param_string.split(",")
95
    for p_string in param_strings:
96
        parts = p_string.split("=")
97
        if len(parts) == 1:
98
            posargs.append(p_string)
99
        elif len(parts) == 2:
100
            kwargs[parts[0]] = parts[1]
101
    return posargs, kwargs
102
103
104
def boolean(val):
105
    """Convert the given value to a boolean True/False value, if it isn't already.
106
    True values are '1', 'yes', 'true', and 'on' (case insensitive), everything
107
    else is False."""
108
109
    return str(val).lower() in ("1", "yes", "true", "on")
110
111
112
def identity(x):
113
    """Identity function: return the given argument unchanged"""
114
    return x
115
116
117
def metric_code(metric):
118
    """Convert a human-readable metric name into an alphanumeric string"""
119
    return metric.translate(metric.maketrans(" ", "_", "()"))
120