Passed
Pull Request — master (#461)
by
unknown
01:59
created

annif.backend.yake   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 192
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 158
dl 0
loc 192
rs 9.28
c 0
b 0
f 0
wmc 39

17 Methods

Rating   Name   Duplication   Size   Complexity  
B YakeBackend._create_index() 0 19 6
A YakeBackend.initialize() 0 10 1
A YakeBackend._lemmatize_phrase() 0 6 2
A YakeBackend._sort_phrase() 0 3 1
A YakeBackend.default_params() 0 4 1
A YakeBackend._combine_suggestions() 0 10 3
A YakeBackend._load_index() 0 7 3
A YakeBackend._save_index() 0 5 3
A YakeBackend.graph() 0 6 2
A YakeBackend._keyphrases2suggestions() 0 16 5
A YakeBackend.label_types() 0 14 3
A YakeBackend.is_trained() 0 3 1
A YakeBackend._keyphrase2uris() 0 4 1
A YakeBackend._transform_score() 0 5 2
A YakeBackend._conflate_scores() 0 2 1
A YakeBackend._initialize_index() 0 12 3
A YakeBackend._suggest() 0 16 1
1
"""Annif backend using Yake keyword extraction"""
2
# TODO Mention GPLv3 license also here?
3
4
import yake
5
import os.path
6
from collections import defaultdict
7
from rdflib.namespace import SKOS, RDF, OWL
8
import rdflib
9
from . import backend
10
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
11
from annif.exception import ConfigurationException
12
13
14
class YakeBackend(backend.AnnifBackend):
15
    """Yake based backend for Annif"""
16
    name = "yake"
17
    needs_subject_index = False
18
19
    # defaults for uninitialized instances
20
    _index = None
21
    _graph = None
22
    INDEX_FILE = 'yake-index'
23
24
    DEFAULT_PARAMETERS = {
25
        'max_ngram_size': 4,
26
        'deduplication_threshold': 0.9,
27
        'deduplication_algo': 'levs',
28
        'window_size': 1,
29
        'num_keywords': 100,
30
        'features': None,
31
        'default_label_types': ['pref', 'alt']
32
    }
33
34
    def default_params(self):
35
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
36
        params.update(self.DEFAULT_PARAMETERS)
37
        return params
38
39
    @property
40
    def is_trained(self):
41
        return True
42
43
    @property
44
    def label_types(self):
45
        mapping = {'pref': SKOS.prefLabel,
46
                   'alt': SKOS.altLabel,
47
                   'hidden': SKOS.hiddenLabel}
48
        if 'label_types' in self.params:
49
            lt_entries = self.params['label_types'].split(',')
50
            try:
51
                return [mapping[lt.strip()] for lt in lt_entries]
52
            except KeyError as err:
53
                raise ConfigurationException(
54
                    f'invalid label type {err}', backend_id=self.backend_id)
55
        else:
56
            return [mapping[lt] for lt in self.params['default_label_types']]
57
58
    def initialize(self):
59
        self._initialize_index()
60
        self._kw_extractor = yake.KeywordExtractor(
61
            lan=self.project.language,
62
            n=self.params['max_ngram_size'],
63
            dedupLim=self.params['deduplication_threshold'],
64
            dedupFunc=self.params['deduplication_algo'],
65
            windowsSize=self.params['window_size'],
66
            top=self.params['num_keywords'],
67
            features=self.params['features'])
68
69
    def _initialize_index(self):
70
        if self._index is None:
71
            path = os.path.join(self.datadir, self.INDEX_FILE)
72
            if os.path.exists(path):
73
                self._index = self._load_index(path)
74
                self.info(
75
                    f'Loaded index from {path} with {len(self._index)} labels')
76
            else:
77
                self.info('Creating index')
78
                self._create_index()
79
                self._save_index(path)
80
                self.info(f'Created index with {len(self._index)} labels')
81
82
    @property
83
    def graph(self):
84
        if self._graph is None:
85
            self.info('Loading graph')
86
            self._graph = self.project.vocab.as_graph()
87
        return self._graph
88
89
    def _create_index(self):
90
        # TODO Should index creation & saving be done on loadvoc command?
91
        # Or saving at all? It takes about 1 min to create the index
92
        index = defaultdict(set)
93
        for label_type in self.label_types:
94
            for concept in self.graph.subjects(RDF.type, SKOS.Concept):
95
                if (concept, OWL.deprecated, rdflib.Literal(True)) \
96
                        in self.graph:
97
                    continue
98
                for label in self.graph.objects(concept, label_type):
99
                    if not label.language == self.project.language:
100
                        continue
101
                    uri = str(concept)
102
                    label = str(label)
103
                    lemmatized_label = self._lemmatize_phrase(label)
104
                    lemmatized_label = self._sort_phrase(lemmatized_label)
105
                    index[lemmatized_label].add(uri)
106
        index.pop('', None)  # Remove possible empty string entry
107
        self._index = dict(index)
108
109
    def _save_index(self, path):
110
        with open(path, 'w', encoding='utf-8') as indexfile:
111
            for label, uris in self._index.items():
112
                line = label + '\t' + ' '.join(uris)
113
                print(line, file=indexfile)
114
115
    def _load_index(self, path):
116
        index = dict()
117
        with open(path, 'r', encoding='utf-8') as indexfile:
118
            for line in indexfile:
119
                label, uris = line.strip().split('\t')
120
                index[label] = uris.split()
121
        return index
122
123
    def _sort_phrase(self, phrase):
124
        words = phrase.split()
125
        return ' '.join(sorted(words))
126
127
    def _lemmatize_phrase(self, phrase):
128
        normalized = []
129
        for word in phrase.split():
130
            normalized.append(
131
                self.project.analyzer.normalize_word(word).lower())
132
        return ' '.join(normalized)
133
134
    def _keyphrases2suggestions(self, keyphrases):
135
        suggestions = []
136
        not_matched = []
137
        for kp, score in keyphrases:
138
            uris = self._keyphrase2uris(kp)
139
            for uri in uris:
140
                suggestions.append(
141
                    (uri, self._transform_score(score)))
142
            if not uris:
143
                not_matched.append((kp, self._transform_score(score)))
144
        # Remove duplicate uris, conflating the scores
145
        suggestions = self._combine_suggestions(suggestions)
146
        self.debug('Keyphrases not matched:\n' + '\t'.join(
147
            [kp[0] + ' ' + str(kp[1]) for kp
148
             in sorted(not_matched, reverse=True, key=lambda kp: kp[1])]))
149
        return suggestions
150
151
    def _keyphrase2uris(self, keyphrase):
152
        keyphrase = self._lemmatize_phrase(keyphrase)
153
        keyphrase = self._sort_phrase(keyphrase)
154
        return self._index.get(keyphrase, [])
155
156
    def _transform_score(self, score):
157
        if score < 0:
158
            self.debug(f'Replacing negative YAKE score {score} with zero')
159
            return 1.0
160
        return 1.0 / (score + 1)
161
162
    def _combine_suggestions(self, suggestions):
163
        combined_suggestions = {}
164
        for uri, score in suggestions:
165
            if uri not in combined_suggestions:
166
                combined_suggestions[uri] = score
167
            else:
168
                old_score = combined_suggestions[uri]
169
                combined_suggestions[uri] = self._conflate_scores(
170
                    score, old_score)
171
        return list(combined_suggestions.items())
172
173
    def _conflate_scores(self, score1, score2):
174
        return score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
175
176
    def _suggest(self, text, params):
177
        self.debug(
178
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
179
        limit = int(params['limit'])
180
181
        keyphrases = self._kw_extractor.extract_keywords(text)
182
        suggestions = self._keyphrases2suggestions(keyphrases)
183
184
        subject_suggestions = [SubjectSuggestion(
185
                uri=uri,
186
                label=None,
187
                notation=None,
188
                score=score)
189
                for uri, score in suggestions[:limit] if score > 0.0]
190
        return ListSuggestionResult.create_from_index(subject_suggestions,
191
                                                      self.project.subjects)
192