Passed
Push — issue678-refactor-suggestionre... ( d7e7fa...c828bb )
by Osma
05:27 queued 02:16
created

annif.util.merge_hits()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 1
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
"""Utility functions for Annif"""
2
3
import glob
4
import logging
5
import os
6
import os.path
7
import tempfile
8
9
import numpy as np
10
11
from annif import logger
12
13
14
class DuplicateFilter(logging.Filter):
15
    """Filter out log messages that have already been displayed."""
16
17
    def __init__(self):
18
        super().__init__()
19
        self.logged = set()
20
21
    def filter(self, record):
22
        current_log = hash((record.module, record.levelno, record.msg, record.args))
23
        if current_log not in self.logged:
24
            self.logged.add(current_log)
25
            return True
26
        return False
27
28
29
def atomic_save(obj, dirname, filename, method=None):
30
    """Save the given object (which must have a .save() method, unless the
31
    method parameter is given) into the given directory with the given
32
    filename, using a temporary file and renaming the temporary file to the
33
    final name."""
34
35
    prefix, suffix = os.path.splitext(filename)
36
    tempfd, tempfilename = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dirname)
37
    os.close(tempfd)
38
    logger.debug("saving %s to temporary file %s", str(obj)[:90], tempfilename)
39
    if method is not None:
40
        method(obj, tempfilename)
41
    else:
42
        obj.save(tempfilename)
43
    for fn in glob.glob(tempfilename + "*"):
44
        newname = fn.replace(tempfilename, os.path.join(dirname, filename))
45
        logger.debug("renaming temporary file %s to %s", fn, newname)
46
        os.rename(fn, newname)
47
48
49
def cleanup_uri(uri):
50
    """remove angle brackets from a URI, if any"""
51
    if uri.startswith("<") and uri.endswith(">"):
52
        return uri[1:-1]
53
    return uri
54
55
56
def parse_sources(sourcedef):
57
    """parse a source definition such as 'src1:1.0,src2' into a sequence of
58
    tuples (src_id, weight)"""
59
60
    sources = []
61
    totalweight = 0.0
62
    for srcdef in sourcedef.strip().split(","):
63
        srcval = srcdef.strip().split(":")
64
        src_id = srcval[0]
65
        if len(srcval) > 1:
66
            weight = float(srcval[1])
67
        else:
68
            weight = 1.0
69
        sources.append((src_id, weight))
70
        totalweight += weight
71
    return [(srcid, weight / totalweight) for srcid, weight in sources]
72
73
74
def parse_args(param_string):
75
    """Parse a string of comma separated arguments such as '42,43,key=abc' into
76
    a list of positional args [42, 43] and a dict of keyword args {key: abc}"""
77
78
    if not param_string:
79
        return [], {}
80
    posargs = []
81
    kwargs = {}
82
    param_strings = param_string.split(",")
83
    for p_string in param_strings:
84
        parts = p_string.split("=")
85
        if len(parts) == 1:
86
            posargs.append(p_string)
87
        elif len(parts) == 2:
88
            kwargs[parts[0]] = parts[1]
89
    return posargs, kwargs
90
91
92
def boolean(val):
93
    """Convert the given value to a boolean True/False value, if it isn't already.
94
    True values are '1', 'yes', 'true', and 'on' (case insensitive), everything
95
    else is False."""
96
97
    return str(val).lower() in ("1", "yes", "true", "on")
98
99
100
def identity(x):
101
    """Identity function: return the given argument unchanged"""
102
    return x
103
104
105
def metric_code(metric):
106
    """Convert a human-readable metric name into an alphanumeric string"""
107
    return metric.translate(metric.maketrans(" ", "_", "()"))
108