Passed
Pull Request — master (#461)
by
unknown
01:41
created

annif.backend.yake   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 35
eloc 145
dl 0
loc 181
rs 9.6
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
B YakeBackend._create_index() 0 19 6
A YakeBackend.initialize() 0 10 1
A YakeBackend._lemmatize_phrase() 0 6 2
A YakeBackend._sort_phrase() 0 3 1
A YakeBackend.default_params() 0 4 1
A YakeBackend._combine_suggestions() 0 12 3
A YakeBackend._load_index() 0 7 3
A YakeBackend._save_index() 0 5 3
A YakeBackend.graph() 0 10 2
A YakeBackend._keyphrases2suggestions() 0 17 5
A YakeBackend.is_trained() 0 3 1
A YakeBackend._keyphrase2uris() 0 4 1
A YakeBackend._transform_score() 0 3 1
A YakeBackend._conflate_scores() 0 3 1
A YakeBackend._initialize_index() 0 12 3
A YakeBackend._suggest() 0 16 1
1
"""Annif backend using Yake keyword extraction"""
2
# TODO Mention GPLv3 license also here?
3
4
import yake
5
import os.path
6
from collections import defaultdict
7
from rdflib.namespace import SKOS, RDF, OWL
8
import rdflib
9
from . import backend
10
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
11
12
13
class YakeBackend(backend.AnnifBackend):
14
    """Yake based backend for Annif"""
15
    name = "yake"
16
    needs_subject_index = False
17
18
    # defaults for uninitialized instances
19
    _index = None
20
    _graph = None
21
    INDEX_FILE = 'yake-index'
22
23
    DEFAULT_PARAMETERS = {
24
        'max_ngram_size': 4,
25
        'deduplication_threshold': 0.9,
26
        'deduplication_algo': 'levs',
27
        'window_size': 1,
28
        'num_keywords': 100,
29
        'features': None,
30
    }
31
32
    def default_params(self):
33
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
34
        params.update(self.DEFAULT_PARAMETERS)
35
        return params
36
37
    @property
38
    def is_trained(self):
39
        return True
40
41
    def initialize(self):
42
        self._initialize_index()
43
        self._kw_extractor = yake.KeywordExtractor(
44
            lan=self.project.language,
45
            n=self.params['max_ngram_size'],
46
            dedupLim=self.params['deduplication_threshold'],
47
            dedupFunc=self.params['deduplication_algo'],
48
            windowsSize=self.params['window_size'],
49
            top=self.params['num_keywords'],
50
            features=self.params['features'])
51
52
    def _initialize_index(self):
53
        if self._index is None:
54
            path = os.path.join(self.datadir, self.INDEX_FILE)
55
            if os.path.exists(path):
56
                self._index = self._load_index(path)
57
                self.info(
58
                    f'Loaded index from {path} with {len(self._index)} labels')
59
            else:
60
                self.info('Creating index')
61
                self._create_index()
62
                self._save_index(path)
63
                self.info(f'Created index with {len(self._index)} labels')
64
65
    @property
66
    def graph(self):
67
        if self._graph is None:
68
            # TODO use as_graph() that is now available
69
            # self._graph = vocab.as_graph()
70
            self._graph = rdflib.Graph()
71
            path = os.path.join(self.project.vocab.datadir, 'subjects.ttl')
72
            self.info('Loading graph from {}'.format(path))
73
            self._graph.load(path, format=rdflib.util.guess_format(path))
74
        return self._graph
75
76
    def _create_index(self):
77
        # TODO Should index creation & saving be done on loadvoc command?
78
        # Or saving at all? It takes about 1 min to create the index
79
        index = defaultdict(set)
80
        for predicate in [SKOS.prefLabel, SKOS.altLabel, SKOS.hiddenLabel]:
81
            for concept in self.graph.subjects(RDF.type, SKOS.Concept):
82
                if (concept, OWL.deprecated, rdflib.Literal(True)) \
83
                        in self.graph:
84
                    continue
85
                for label in self.graph.objects(concept, predicate):
86
                    if not label.language == self.project.language:
87
                        continue
88
                    uri = str(concept)
89
                    label = str(label)
90
                    lemmatized_label = self._lemmatize_phrase(label)
91
                    lemmatized_label = self._sort_phrase(lemmatized_label)
92
                    index[lemmatized_label].add(uri)
93
        index.pop('', None)  # Remove possible empty string entry
94
        self._index = dict(index)
95
96
    def _save_index(self, path):
97
        with open(path, 'w', encoding='utf-8') as indexfile:
98
            for label, uris in self._index.items():
99
                line = label + '\t' + ' '.join(uris)
100
                print(line, file=indexfile)
101
102
    def _load_index(self, path):
103
        index = dict()
104
        with open(path, 'r', encoding='utf-8') as indexfile:
105
            for line in indexfile:
106
                label, uris = line.strip().split('\t')
107
                index[label] = uris.split()
108
        return index
109
110
    def _sort_phrase(self, phrase):
111
        words = phrase.split()
112
        return ' '.join(sorted(words))
113
114
    def _lemmatize_phrase(self, phrase):
115
        normalized = []
116
        for word in phrase.split():
117
            normalized.append(
118
                self.project.analyzer.normalize_word(word).lower())
119
        return ' '.join(normalized)
120
121
    def _keyphrases2suggestions(self, keyphrases):
122
        suggestions = []
123
        not_matched = []
124
        for kp, score in keyphrases:
125
            uris = self._keyphrase2uris(kp)
126
            for uri in uris:
127
                label = self.project.subjects.uris_to_labels([uri])[0]
128
                suggestions.append(
129
                    (uri, label, self._transform_score(score)))
130
            if not uris:
131
                not_matched.append((kp, self._transform_score(score)))
132
        # Remove duplicate uris, combining the scores
133
        suggestions = self._combine_suggestions(suggestions)
134
        self.debug('Keyphrases not matched:\n' + '\t'.join(
135
            [x[0] + ' ' + str(x[1]) for x
136
             in sorted(not_matched, reverse=True, key=lambda x: x[1])]))
137
        return suggestions
138
139
    def _keyphrase2uris(self, keyphrase):
140
        keyphrase = self._lemmatize_phrase(keyphrase)
141
        keyphrase = self._sort_phrase(keyphrase)
142
        return self._index.get(keyphrase, [])
143
144
    def _transform_score(self, score):
145
        # TODO if score<0:
146
        return 1.0 / (score + 1)
147
148
    def _combine_suggestions(self, suggestions):
149
        combined_suggestions = {}
150
        for uri, label, score in suggestions:
151
            if uri not in combined_suggestions:
152
                combined_suggestions[uri] = (label, score)
153
            else:
154
                old_score = combined_suggestions[uri][1]
155
                conflated_score = self._conflate_scores(score, old_score)
156
                combined_suggestions[uri] = (label, conflated_score)
157
        combined_suggestions = [(uri, *label_score) for uri, label_score
158
                                in combined_suggestions.items()]
159
        return combined_suggestions
160
161
    def _conflate_scores(self, score1, score2):
162
        # https://stats.stackexchange.com/questions/194878/combining-two-probability-scores/194884
163
        return score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
164
165
    def _suggest(self, text, params):
166
        self.debug(
167
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
168
        limit = int(params['limit'])
169
170
        keywords = self._kw_extractor.extract_keywords(text)
171
        suggestions = self._keyphrases2suggestions(keywords)
172
173
        subject_suggestions = [SubjectSuggestion(
174
                uri=uri,
175
                label=label,
176
                notation=None,  # TODO Should notation be fetched to here?
177
                score=score)
178
                for uri, label, score in suggestions[:limit] if score > 0.0]
179
        return ListSuggestionResult.create_from_index(subject_suggestions,
180
                                                      self.project.subjects)
181