Passed
Pull Request — master (#461)
by
unknown
02:00
created

annif.backend.yake   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 192
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 156
dl 0
loc 192
rs 9.44
c 0
b 0
f 0
wmc 37

18 Methods

Rating   Name   Duplication   Size   Complexity  
A YakeBackend._create_index() 0 12 3
A YakeBackend.initialize() 0 2 1
A YakeBackend._lemmatize_phrase() 0 6 2
A YakeBackend._sort_phrase() 0 3 1
A YakeBackend.default_params() 0 4 1
A YakeBackend._combine_scores() 0 6 1
A YakeBackend._combine_suggestions() 0 10 3
A YakeBackend._load_index() 0 7 3
A YakeBackend._validate_label_types() 0 5 3
A YakeBackend._save_index() 0 5 3
A YakeBackend._keyphrases2suggestions() 0 16 5
A YakeBackend.label_types() 0 9 2
A YakeBackend.is_trained() 0 3 1
A YakeBackend._keyphrase2uris() 0 4 1
A YakeBackend._transform_score() 0 3 1
A YakeBackend._initialize_index() 0 12 3
A YakeBackend._suggest() 0 24 1
A YakeBackend._normalize_label() 0 6 2
1
"""Annif backend using Yake keyword extraction"""
2
# Enabling this optional backend may result in GPLv3 terms to cover the
3
# application, because YAKE (https://github.com/LIAAD/yake) is licensed under
4
# GPLv3.
5
6
import yake
7
import os.path
8
import re
9
from collections import defaultdict
10
from rdflib.namespace import SKOS
11
import annif.util
12
from . import backend
13
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
14
from annif.exception import ConfigurationException
15
16
17
class YakeBackend(backend.AnnifBackend):
18
    """Yake based backend for Annif"""
19
    name = "yake"
20
    needs_subject_index = False
21
22
    # defaults for uninitialized instances
23
    _index = None
24
    _graph = None
25
    INDEX_FILE = 'yake-index'
26
27
    DEFAULT_PARAMETERS = {
28
        'max_ngram_size': 4,
29
        'deduplication_threshold': 0.9,
30
        'deduplication_algo': 'levs',
31
        'window_size': 1,
32
        'num_keywords': 100,
33
        'features': None,
34
        'label_types': ['prefLabel', 'altLabel'],
35
        'remove_parentheses': False
36
    }
37
38
    def default_params(self):
39
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
40
        params.update(self.DEFAULT_PARAMETERS)
41
        return params
42
43
    @property
44
    def is_trained(self):
45
        return True
46
47
    @property
48
    def label_types(self):
49
        if type(self.params['label_types']) == str:  # Label types set by user
50
            label_types = [lt.strip() for lt
51
                           in self.params['label_types'].split(',')]
52
            self._validate_label_types(label_types)
53
        else:
54
            label_types = self.params['label_types']  # The defaults
55
        return [getattr(SKOS, lt) for lt in label_types]
56
57
    def _validate_label_types(self, label_types):
58
        for lt in label_types:
59
            if lt not in ('prefLabel', 'altLabel', 'hiddenLabel'):
60
                raise ConfigurationException(
61
                    f'invalid label type {lt}', backend_id=self.backend_id)
62
63
    def initialize(self):
64
        self._initialize_index()
65
66
    def _initialize_index(self):
67
        if self._index is None:
68
            path = os.path.join(self.datadir, self.INDEX_FILE)
69
            if os.path.exists(path):
70
                self._index = self._load_index(path)
71
                self.info(
72
                    f'Loaded index from {path} with {len(self._index)} labels')
73
            else:
74
                self.info('Creating index')
75
                self._index = self._create_index()
76
                self._save_index(path)
77
                self.info(f'Created index with {len(self._index)} labels')
78
79
    def _save_index(self, path):
80
        with open(path, 'w', encoding='utf-8') as indexfile:
81
            for label, uris in self._index.items():
82
                line = label + '\t' + ' '.join(uris)
83
                print(line, file=indexfile)
84
85
    def _load_index(self, path):
86
        index = dict()
87
        with open(path, 'r', encoding='utf-8') as indexfile:
88
            for line in indexfile:
89
                label, uris = line.strip().split('\t')
90
                index[label] = uris.split()
91
        return index
92
93
    def _create_index(self):
94
        index = defaultdict(set)
95
        concepts = self.project.vocab.skos_vocab.concepts
96
        for concept in concepts:
97
            uri = str(concept)
98
            labels = self.project.vocab.skos_vocab.get_concept_labels(
99
                concept, self.label_types, self.params['language'])
100
            for label in labels:
101
                label = self._normalize_label(label)
102
                index[label].add(uri)
103
        index.pop('', None)  # Remove possible empty string entry
104
        return dict(index)
105
106
    def _normalize_label(self, label):
107
        label = str(label)
108
        if annif.util.boolean(self.params['remove_parentheses']):
109
            label = re.sub(r' \(.*\)', '', label)
110
        lemmatized_label = self._lemmatize_phrase(label)
111
        return self._sort_phrase(lemmatized_label)
112
113
    def _lemmatize_phrase(self, phrase):
114
        normalized = []
115
        for word in phrase.split():
116
            normalized.append(
117
                self.project.analyzer.normalize_word(word).lower())
118
        return ' '.join(normalized)
119
120
    def _sort_phrase(self, phrase):
121
        words = phrase.split()
122
        return ' '.join(sorted(words))
123
124
    def _suggest(self, text, params):
125
        self.debug(
126
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
127
        limit = int(params['limit'])
128
129
        self._kw_extractor = yake.KeywordExtractor(
130
            lan=params['language'],
131
            n=int(params['max_ngram_size']),
132
            dedupLim=float(params['deduplication_threshold']),
133
            dedupFunc=params['deduplication_algo'],
134
            windowsSize=int(params['window_size']),
135
            top=int(params['num_keywords']),
136
            features=self.params['features'])
137
        keyphrases = self._kw_extractor.extract_keywords(text)
138
        suggestions = self._keyphrases2suggestions(keyphrases)
139
140
        subject_suggestions = [SubjectSuggestion(
141
                uri=uri,
142
                label=None,
143
                notation=None,
144
                score=score)
145
                for uri, score in suggestions[:limit] if score > 0.0]
146
        return ListSuggestionResult.create_from_index(subject_suggestions,
147
                                                      self.project.subjects)
148
149
    def _keyphrases2suggestions(self, keyphrases):
150
        suggestions = []
151
        not_matched = []
152
        for kp, score in keyphrases:
153
            uris = self._keyphrase2uris(kp)
154
            for uri in uris:
155
                suggestions.append(
156
                    (uri, self._transform_score(score)))
157
            if not uris:
158
                not_matched.append((kp, self._transform_score(score)))
159
        # Remove duplicate uris, conflating the scores
160
        suggestions = self._combine_suggestions(suggestions)
161
        self.debug('Keyphrases not matched:\n' + '\t'.join(
162
            [kp[0] + ' ' + str(kp[1]) for kp
163
             in sorted(not_matched, reverse=True, key=lambda kp: kp[1])]))
164
        return suggestions
165
166
    def _keyphrase2uris(self, keyphrase):
167
        keyphrase = self._lemmatize_phrase(keyphrase)
168
        keyphrase = self._sort_phrase(keyphrase)
169
        return self._index.get(keyphrase, [])
170
171
    def _transform_score(self, score):
172
        score = max(score, 0)
173
        return 1.0 / (score + 1)
174
175
    def _combine_suggestions(self, suggestions):
176
        combined_suggestions = {}
177
        for uri, score in suggestions:
178
            if uri not in combined_suggestions:
179
                combined_suggestions[uri] = score
180
            else:
181
                old_score = combined_suggestions[uri]
182
                combined_suggestions[uri] = self._combine_scores(
183
                    score, old_score)
184
        return list(combined_suggestions.items())
185
186
    def _combine_scores(self, score1, score2):
187
        # The result is never smaller than the greater input
188
        score1 = score1/2 + 0.5
189
        score2 = score2/2 + 0.5
190
        confl = score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
191
        return (confl-0.5) * 2
192