Passed
Pull Request — master (#461)
by
unknown
02:15
created

annif.backend.yake   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 212
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 175
dl 0
loc 212
rs 8.72
c 0
b 0
f 0
wmc 46

20 Methods

Rating   Name   Duplication   Size   Complexity  
A YakeBackend._lemmatize_phrase() 0 6 2
A YakeBackend.default_params() 0 4 1
A YakeBackend.is_trained() 0 3 1
A YakeBackend._initialize_index() 0 12 3
A YakeBackend._get_concept_labels() 0 7 4
A YakeBackend._create_index() 0 12 4
A YakeBackend.initialize() 0 2 1
A YakeBackend._sort_phrase() 0 3 1
A YakeBackend._combine_scores() 0 6 1
A YakeBackend._combine_suggestions() 0 10 3
A YakeBackend._load_index() 0 7 3
A YakeBackend._validate_label_types() 0 5 3
A YakeBackend._save_index() 0 5 3
A YakeBackend.graph() 0 6 2
A YakeBackend._keyphrases2suggestions() 0 16 5
A YakeBackend.label_types() 0 9 2
A YakeBackend._keyphrase2uris() 0 4 1
A YakeBackend._transform_score() 0 5 2
A YakeBackend._suggest() 0 28 2
A YakeBackend._normalize_label() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like annif.backend.yake often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Annif backend using Yake keyword extraction"""
2
# TODO Mention GPLv3 license also here?
3
4
import yake
5
import os.path
6
import re
7
from collections import defaultdict
8
from rdflib.namespace import SKOS, RDF, OWL
9
import rdflib
10
import annif.util
11
from . import backend
12
from annif.suggestion import SubjectSuggestion, ListSuggestionResult
13
from annif.exception import ConfigurationException
14
15
16
class YakeBackend(backend.AnnifBackend):
17
    """Yake based backend for Annif"""
18
    name = "yake"
19
    needs_subject_index = False
20
21
    # defaults for uninitialized instances
22
    _index = None
23
    _graph = None
24
    INDEX_FILE = 'yake-index'
25
26
    DEFAULT_PARAMETERS = {
27
        'max_ngram_size': 4,
28
        'deduplication_threshold': 0.9,
29
        'deduplication_algo': 'levs',
30
        'window_size': 1,
31
        'num_keywords': 100,
32
        'features': None,
33
        'label_types': ['prefLabel', 'altLabel'],
34
        'remove_parentheses': False
35
    }
36
37
    def default_params(self):
38
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
39
        params.update(self.DEFAULT_PARAMETERS)
40
        return params
41
42
    @property
43
    def is_trained(self):
44
        return True
45
46
    @property
47
    def label_types(self):
48
        if type(self.params['label_types']) == str:  # Label types set by user
49
            label_types = [lt.strip() for lt
50
                           in self.params['label_types'].split(',')]
51
            self._validate_label_types(label_types)
52
        else:
53
            label_types = self.params['label_types']  # The defaults
54
        return [getattr(SKOS, lt) for lt in label_types]
55
56
    def _validate_label_types(self, label_types):
57
        for lt in label_types:
58
            if lt not in ('prefLabel', 'altLabel', 'hiddenLabel'):
59
                raise ConfigurationException(
60
                    f'invalid label type {lt}', backend_id=self.backend_id)
61
62
    @property
63
    def graph(self):
64
        if self._graph is None:
65
            self.info('Loading graph')
66
            self._graph = self.project.vocab.as_graph()
67
        return self._graph
68
69
    def initialize(self):
70
        self._initialize_index()
71
72
    def _initialize_index(self):
73
        if self._index is None:
74
            path = os.path.join(self.datadir, self.INDEX_FILE)
75
            if os.path.exists(path):
76
                self._index = self._load_index(path)
77
                self.info(
78
                    f'Loaded index from {path} with {len(self._index)} labels')
79
            else:
80
                self.info('Creating index')
81
                self._index = self._create_index()
82
                self._save_index(path)
83
                self.info(f'Created index with {len(self._index)} labels')
84
85
    def _save_index(self, path):
86
        with open(path, 'w', encoding='utf-8') as indexfile:
87
            for label, uris in self._index.items():
88
                line = label + '\t' + ' '.join(uris)
89
                print(line, file=indexfile)
90
91
    def _load_index(self, path):
92
        index = dict()
93
        with open(path, 'r', encoding='utf-8') as indexfile:
94
            for line in indexfile:
95
                label, uris = line.strip().split('\t')
96
                index[label] = uris.split()
97
        return index
98
99
    def _create_index(self):
100
        index = defaultdict(set)
101
        for concept in self.graph.subjects(RDF.type, SKOS.Concept):
102
            if (concept, OWL.deprecated, rdflib.Literal(True)) in self.graph:
103
                continue
104
            uri = str(concept)
105
            labels = self._get_concept_labels(concept, self.label_types)
106
            for label in labels:
107
                label = self._normalize_label(label)
108
                index[label].add(uri)
109
        index.pop('', None)  # Remove possible empty string entry
110
        return dict(index)
111
112
    def _get_concept_labels(self, concept, label_types):
113
        labels = []
114
        for label_type in label_types:
115
            for label in self.graph.objects(concept, label_type):
116
                if label.language == self.params['language']:
117
                    labels.append(label)
118
        return labels
119
120
    def _normalize_label(self, label):
121
        label = str(label)
122
        if annif.util.boolean(self.params['remove_parentheses']):
123
            label = re.sub(r' \(.*\)', '', label)
124
        lemmatized_label = self._lemmatize_phrase(label)
125
        return self._sort_phrase(lemmatized_label)
126
127
    def _lemmatize_phrase(self, phrase):
128
        normalized = []
129
        for word in phrase.split():
130
            normalized.append(
131
                self.project.analyzer.normalize_word(word).lower())
132
        return ' '.join(normalized)
133
134
    def _sort_phrase(self, phrase):
135
        words = phrase.split()
136
        return ' '.join(sorted(words))
137
138
    def _suggest(self, text, params):
139
        self.debug(
140
            f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})')
141
        limit = int(params['limit'])
142
143
        alphanum = re.compile('[^a-zA-Z0-9]')
144
        if len(re.sub(alphanum, '', text)) == 0:
145
            return ListSuggestionResult([])
146
147
        self._kw_extractor = yake.KeywordExtractor(
148
            lan=params['language'],
149
            n=int(params['max_ngram_size']),
150
            dedupLim=float(params['deduplication_threshold']),
151
            dedupFunc=params['deduplication_algo'],
152
            windowsSize=int(params['window_size']),
153
            top=int(params['num_keywords']),
154
            features=self.params['features'])
155
        keyphrases = self._kw_extractor.extract_keywords(text)
156
        suggestions = self._keyphrases2suggestions(keyphrases)
157
158
        subject_suggestions = [SubjectSuggestion(
159
                uri=uri,
160
                label=None,
161
                notation=None,
162
                score=score)
163
                for uri, score in suggestions[:limit] if score > 0.0]
164
        return ListSuggestionResult.create_from_index(subject_suggestions,
165
                                                      self.project.subjects)
166
167
    def _keyphrases2suggestions(self, keyphrases):
168
        suggestions = []
169
        not_matched = []
170
        for kp, score in keyphrases:
171
            uris = self._keyphrase2uris(kp)
172
            for uri in uris:
173
                suggestions.append(
174
                    (uri, self._transform_score(score)))
175
            if not uris:
176
                not_matched.append((kp, self._transform_score(score)))
177
        # Remove duplicate uris, conflating the scores
178
        suggestions = self._combine_suggestions(suggestions)
179
        self.debug('Keyphrases not matched:\n' + '\t'.join(
180
            [kp[0] + ' ' + str(kp[1]) for kp
181
             in sorted(not_matched, reverse=True, key=lambda kp: kp[1])]))
182
        return suggestions
183
184
    def _keyphrase2uris(self, keyphrase):
185
        keyphrase = self._lemmatize_phrase(keyphrase)
186
        keyphrase = self._sort_phrase(keyphrase)
187
        return self._index.get(keyphrase, [])
188
189
    def _transform_score(self, score):
190
        if score < 0:
191
            self.debug(f'Replacing negative YAKE score {score} with zero')
192
            return 1.0
193
        return 1.0 / (score + 1)
194
195
    def _combine_suggestions(self, suggestions):
196
        combined_suggestions = {}
197
        for uri, score in suggestions:
198
            if uri not in combined_suggestions:
199
                combined_suggestions[uri] = score
200
            else:
201
                old_score = combined_suggestions[uri]
202
                combined_suggestions[uri] = self._combine_scores(
203
                    score, old_score)
204
        return list(combined_suggestions.items())
205
206
    def _combine_scores(self, score1, score2):
207
        # The result is never smaller than the greater input
208
        score1 = score1/2 + 0.5
209
        score2 = score2/2 + 0.5
210
        confl = score1 * score2 / (score1 * score2 + (1-score1) * (1-score2))
211
        return (confl-0.5) * 2
212