Passed
Pull Request — master (#461)
by
unknown
03:34
created

annif.backend.yake   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 189
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 155
dl 0
loc 189
rs 9.44
c 0
b 0
f 0
wmc 37

18 Methods

Rating   Name   Duplication   Size   Complexity  
A YakeBackend._create_index() 0 11 3
A YakeBackend.initialize() 0 2 1
A YakeBackend._lemmatize_phrase() 0 6 2
A YakeBackend._sort_phrase() 0 3 1
A YakeBackend.default_params() 0 4 1
A YakeBackend._load_index() 0 7 3
A YakeBackend._validate_label_types() 0 5 3
A YakeBackend._save_index() 0 5 3
A YakeBackend.label_types() 0 9 2
A YakeBackend.is_trained() 0 3 1
A YakeBackend._initialize_index() 0 12 3
A YakeBackend._normalize_label() 0 6 2
A YakeBackend._keyphrases2suggestions() 0 16 5
A YakeBackend._keyphrase2uris() 0 4 1
A YakeBackend._suggest() 0 24 1
A YakeBackend._combine_scores() 0 6 1
A YakeBackend._combine_suggestions() 0 10 3
A YakeBackend._transform_score() 0 3 1
1
"""Annif backend using Yake keyword extraction"""
2
# TODO Mention GPLv3 license also here?
3
4
import yake
5
import os.path
6
import re
7
from collections import defaultdict
8
from rdflib.namespace import SKOS
9
import annif.util
10
from . import backend
11
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
12
from annif.exception import ConfigurationException
13
14
15
class YakeBackend(backend.AnnifBackend):
16
    """Yake based backend for Annif"""
17
    name = "yake"
18
    needs_subject_index = False
19
20
    # defaults for uninitialized instances
21
    _index = None
22
    _graph = None
23
    INDEX_FILE = 'yake-index'
24
25
    DEFAULT_PARAMETERS = {
26
        'max_ngram_size': 4,
27
        'deduplication_threshold': 0.9,
28
        'deduplication_algo': 'levs',
29
        'window_size': 1,
30
        'num_keywords': 100,
31
        'features': None,
32
        'label_types': ['prefLabel', 'altLabel'],
33
        'remove_parentheses': False
34
    }
35
36
    def default_params(self):
37
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
38
        params.update(self.DEFAULT_PARAMETERS)
39
        return params
40
41
    @property
42
    def is_trained(self):
43
        return True
44
45
    @property
46
    def label_types(self):
47
        if type(self.params['label_types']) == str:  # Label types set by user
48
            label_types = [lt.strip() for lt
49
                           in self.params['label_types'].split(',')]
50
            self._validate_label_types(label_types)
51
        else:
52
            label_types = self.params['label_types']  # The defaults
53
        return [getattr(SKOS, lt) for lt in label_types]
54
55
    def _validate_label_types(self, label_types):
56
        for lt in label_types:
57
            if lt not in ('prefLabel', 'altLabel', 'hiddenLabel'):
58
                raise ConfigurationException(
59
                    f'invalid label type {lt}', backend_id=self.backend_id)
60
61
    def initialize(self):
62
        self._initialize_index()
63
64
    def _initialize_index(self):
65
        if self._index is None:
66
            path = os.path.join(self.datadir, self.INDEX_FILE)
67
            if os.path.exists(path):
68
                self._index = self._load_index(path)
69
                self.info(
70
                    f'Loaded index from {path} with {len(self._index)} labels')
71
            else:
72
                self.info('Creating index')
73
                self._index = self._create_index()
74
                self._save_index(path)
75
                self.info(f'Created index with {len(self._index)} labels')
76
77
    def _save_index(self, path):
78
        with open(path, 'w', encoding='utf-8') as indexfile:
79
            for label, uris in self._index.items():
80
                line = label + '\t' + ' '.join(uris)
81
                print(line, file=indexfile)
82
83
    def _load_index(self, path):
84
        index = dict()
85
        with open(path, 'r', encoding='utf-8') as indexfile:
86
            for line in indexfile:
87
                label, uris = line.strip().split('\t')
88
                index[label] = uris.split()
89
        return index
90
91
    def _create_index(self):
92
        index = defaultdict(set)
93
        for concept in self.project.vocab.skos_concepts:
94
            uri = str(concept)
95
            labels = self.project.vocab.get_skos_concept_labels(
96
                concept, self.label_types, self.params['language'])
97
            for label in labels:
98
                label = self._normalize_label(label)
99
                index[label].add(uri)
100
        index.pop('', None)  # Remove possible empty string entry
101
        return dict(index)
102
103
    def _normalize_label(self, label):
104
        label = str(label)
105
        if annif.util.boolean(self.params['remove_parentheses']):
106
            label = re.sub(r' \(.*\)', '', label)
107
        lemmatized_label = self._lemmatize_phrase(label)
108
        return self._sort_phrase(lemmatized_label)
109
110
    def _lemmatize_phrase(self, phrase):
111
        normalized = []
112
        for word in phrase.split():
113
            normalized.append(
114
                self.project.analyzer.normalize_word(word).lower())
115
        return ' '.join(normalized)
116
117
    def _sort_phrase(self, phrase):
118
        words = phrase.split()
119
        return ' '.join(sorted(words))
120
121
    def _suggest(self, text, params):
122
        self.debug(
123
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
124
        limit = int(params['limit'])
125
126
        self._kw_extractor = yake.KeywordExtractor(
127
            lan=params['language'],
128
            n=int(params['max_ngram_size']),
129
            dedupLim=float(params['deduplication_threshold']),
130
            dedupFunc=params['deduplication_algo'],
131
            windowsSize=int(params['window_size']),
132
            top=int(params['num_keywords']),
133
            features=self.params['features'])
134
        keyphrases = self._kw_extractor.extract_keywords(text)
135
        suggestions = self._keyphrases2suggestions(keyphrases)
136
137
        subject_suggestions = [SubjectSuggestion(
138
                uri=uri,
139
                label=None,
140
                notation=None,
141
                score=score)
142
                for uri, score in suggestions[:limit] if score > 0.0]
143
        return ListSuggestionResult.create_from_index(subject_suggestions,
144
                                                      self.project.subjects)
145
146
    def _keyphrases2suggestions(self, keyphrases):
147
        suggestions = []
148
        not_matched = []
149
        for kp, score in keyphrases:
150
            uris = self._keyphrase2uris(kp)
151
            for uri in uris:
152
                suggestions.append(
153
                    (uri, self._transform_score(score)))
154
            if not uris:
155
                not_matched.append((kp, self._transform_score(score)))
156
        # Remove duplicate uris, conflating the scores
157
        suggestions = self._combine_suggestions(suggestions)
158
        self.debug('Keyphrases not matched:\n' + '\t'.join(
159
            [kp[0] + ' ' + str(kp[1]) for kp
160
             in sorted(not_matched, reverse=True, key=lambda kp: kp[1])]))
161
        return suggestions
162
163
    def _keyphrase2uris(self, keyphrase):
164
        keyphrase = self._lemmatize_phrase(keyphrase)
165
        keyphrase = self._sort_phrase(keyphrase)
166
        return self._index.get(keyphrase, [])
167
168
    def _transform_score(self, score):
169
        score = max(score, 0)
170
        return 1.0 / (score + 1)
171
172
    def _combine_suggestions(self, suggestions):
173
        combined_suggestions = {}
174
        for uri, score in suggestions:
175
            if uri not in combined_suggestions:
176
                combined_suggestions[uri] = score
177
            else:
178
                old_score = combined_suggestions[uri]
179
                combined_suggestions[uri] = self._combine_scores(
180
                    score, old_score)
181
        return list(combined_suggestions.items())
182
183
    def _combine_scores(self, score1, score2):
184
        # The result is never smaller than the greater input
185
        score1 = score1/2 + 0.5
186
        score2 = score2/2 + 0.5
187
        confl = score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
188
        return (confl-0.5) * 2
189