Passed
Pull Request — master (#461)
by
unknown
20:48
created

annif.backend.yake.YakeBackend._load_index()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
"""Annif backend using Yake keyword extraction"""
2
# TODO Mention GPLv3 license also here?
3
4
import yake
5
import os.path
6
import re
7
from collections import defaultdict
8
from rdflib.namespace import SKOS
9
import annif.util
10
from . import backend
11
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
12
from annif.exception import ConfigurationException
13
14
15
class YakeBackend(backend.AnnifBackend):
16
    """Yake based backend for Annif"""
17
    name = "yake"
18
    needs_subject_index = False
19
20
    # defaults for uninitialized instances
21
    _index = None
22
    _graph = None
23
    INDEX_FILE = 'yake-index'
24
25
    DEFAULT_PARAMETERS = {
26
        'max_ngram_size': 4,
27
        'deduplication_threshold': 0.9,
28
        'deduplication_algo': 'levs',
29
        'window_size': 1,
30
        'num_keywords': 100,
31
        'features': None,
32
        'label_types': ['prefLabel', 'altLabel'],
33
        'remove_parentheses': False
34
    }
35
36
    def default_params(self):
37
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
38
        params.update(self.DEFAULT_PARAMETERS)
39
        return params
40
41
    @property
42
    def is_trained(self):
43
        return True
44
45
    @property
46
    def label_types(self):
47
        if type(self.params['label_types']) == str:  # Label types set by user
48
            label_types = [lt.strip() for lt
49
                           in self.params['label_types'].split(',')]
50
            self._validate_label_types(label_types)
51
        else:
52
            label_types = self.params['label_types']  # The defaults
53
        return [getattr(SKOS, lt) for lt in label_types]
54
55
    def _validate_label_types(self, label_types):
56
        for lt in label_types:
57
            if lt not in ('prefLabel', 'altLabel', 'hiddenLabel'):
58
                raise ConfigurationException(
59
                    f'invalid label type {lt}', backend_id=self.backend_id)
60
61
    def initialize(self):
62
        self._initialize_index()
63
64
    def _initialize_index(self):
65
        if self._index is None:
66
            path = os.path.join(self.datadir, self.INDEX_FILE)
67
            if os.path.exists(path):
68
                self._index = self._load_index(path)
69
                self.info(
70
                    f'Loaded index from {path} with {len(self._index)} labels')
71
            else:
72
                self.info('Creating index')
73
                self._index = self._create_index()
74
                self._save_index(path)
75
                self.info(f'Created index with {len(self._index)} labels')
76
77
    def _save_index(self, path):
78
        with open(path, 'w', encoding='utf-8') as indexfile:
79
            for label, uris in self._index.items():
80
                line = label + '\t' + ' '.join(uris)
81
                print(line, file=indexfile)
82
83
    def _load_index(self, path):
84
        index = dict()
85
        with open(path, 'r', encoding='utf-8') as indexfile:
86
            for line in indexfile:
87
                label, uris = line.strip().split('\t')
88
                index[label] = uris.split()
89
        return index
90
91
    def _create_index(self):
92
        index = defaultdict(set)
93
        for concept in self.project.vocab.skos_concepts:
94
            uri = str(concept)
95
            labels = self.project.vocab.get_skos_concept_labels(
96
                concept, self.label_types, self.params['language'])
97
            for label in labels:
98
                label = self._normalize_label(label)
99
                index[label].add(uri)
100
        index.pop('', None)  # Remove possible empty string entry
101
        return dict(index)
102
103
    def _normalize_label(self, label):
104
        label = str(label)
105
        if annif.util.boolean(self.params['remove_parentheses']):
106
            label = re.sub(r' \(.*\)', '', label)
107
        lemmatized_label = self._lemmatize_phrase(label)
108
        return self._sort_phrase(lemmatized_label)
109
110
    def _lemmatize_phrase(self, phrase):
111
        normalized = []
112
        for word in phrase.split():
113
            normalized.append(
114
                self.project.analyzer.normalize_word(word).lower())
115
        return ' '.join(normalized)
116
117
    def _sort_phrase(self, phrase):
118
        words = phrase.split()
119
        return ' '.join(sorted(words))
120
121
    def _suggest(self, text, params):
122
        self.debug(
123
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
124
        limit = int(params['limit'])
125
126
        alphanum = re.compile('[^a-zA-Z0-9]')
127
        if len(re.sub(alphanum, '', text)) == 0:
128
            return ListSuggestionResult([])
129
130
        self._kw_extractor = yake.KeywordExtractor(
131
            lan=params['language'],
132
            n=int(params['max_ngram_size']),
133
            dedupLim=float(params['deduplication_threshold']),
134
            dedupFunc=params['deduplication_algo'],
135
            windowsSize=int(params['window_size']),
136
            top=int(params['num_keywords']),
137
            features=self.params['features'])
138
        keyphrases = self._kw_extractor.extract_keywords(text)
139
        suggestions = self._keyphrases2suggestions(keyphrases)
140
141
        subject_suggestions = [SubjectSuggestion(
142
                uri=uri,
143
                label=None,
144
                notation=None,
145
                score=score)
146
                for uri, score in suggestions[:limit] if score > 0.0]
147
        return ListSuggestionResult.create_from_index(subject_suggestions,
148
                                                      self.project.subjects)
149
150
    def _keyphrases2suggestions(self, keyphrases):
151
        suggestions = []
152
        not_matched = []
153
        for kp, score in keyphrases:
154
            uris = self._keyphrase2uris(kp)
155
            for uri in uris:
156
                suggestions.append(
157
                    (uri, self._transform_score(score)))
158
            if not uris:
159
                not_matched.append((kp, self._transform_score(score)))
160
        # Remove duplicate uris, conflating the scores
161
        suggestions = self._combine_suggestions(suggestions)
162
        self.debug('Keyphrases not matched:\n' + '\t'.join(
163
            [kp[0] + ' ' + str(kp[1]) for kp
164
             in sorted(not_matched, reverse=True, key=lambda kp: kp[1])]))
165
        return suggestions
166
167
    def _keyphrase2uris(self, keyphrase):
168
        keyphrase = self._lemmatize_phrase(keyphrase)
169
        keyphrase = self._sort_phrase(keyphrase)
170
        return self._index.get(keyphrase, [])
171
172
    def _transform_score(self, score):
173
        if score < 0:
174
            self.debug(f'Replacing negative YAKE score {score} with zero')
175
            return 1.0
176
        return 1.0 / (score + 1)
177
178
    def _combine_suggestions(self, suggestions):
179
        combined_suggestions = {}
180
        for uri, score in suggestions:
181
            if uri not in combined_suggestions:
182
                combined_suggestions[uri] = score
183
            else:
184
                old_score = combined_suggestions[uri]
185
                combined_suggestions[uri] = self._combine_scores(
186
                    score, old_score)
187
        return list(combined_suggestions.items())
188
189
    def _combine_scores(self, score1, score2):
190
        # The result is never smaller than the greater input
191
        score1 = score1/2 + 0.5
192
        score2 = score2/2 + 0.5
193
        confl = score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
194
        return (confl-0.5) * 2
195