Passed
Pull Request — master (#462)
by Osma
02:33
created

annif.backend.mllm   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 367
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 295
dl 0
loc 367
rs 8.4
c 0
b 0
f 0
wmc 50

23 Methods

Rating   Name   Duplication   Size   Complexity  
A MLLMOptimizer._objective() 0 26 3
A MLLMOptimizer._prepare() 0 12 2
A MLLMModel._candidates_to_features() 0 26 2
A MLLMBackend._load_train_data() 0 8 2
A MLLMBackend._generate_candidates() 0 2 1
A MLLMModel.generate_candidates() 0 15 3
A MLLMBackend.initialize() 0 3 2
A MLLMBackend._train() 0 27 2
A MLLMModel.prepare_train() 0 27 3
A MLLMBackend._load_model() 0 9 2
B MLLMModel._prepare_terms() 0 22 7
A MLLMModel.train() 0 4 1
A MLLMOptimizer._postprocess() 0 9 1
A MLLMBackend.default_params() 0 4 1
B MLLMModel._prepare_relations() 0 22 6
A MLLMModel._prediction_to_list() 0 4 1
A MLLMModel._create_classifier() 0 6 1
A MLLMModel._prepare_train_index() 0 18 2
A MLLMModel.predict() 0 6 2
A MLLMBackend._prediction_to_result() 0 7 2
A MLLMBackend.get_hp_optimizer() 0 2 1
A MLLMModel._conflate_matches() 0 17 2
A MLLMBackend._suggest() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like annif.backend.mllm often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Maui-like Lexical Matching backend"""
2
3
import collections
4
import math
5
from enum import IntEnum
6
from statistics import mean
7
import os.path
8
import joblib
9
import numpy as np
10
from rdflib import URIRef
11
from rdflib.namespace import SKOS
12
from scipy.sparse import lil_matrix
13
from sklearn.feature_extraction.text import CountVectorizer
14
from sklearn.ensemble import BaggingClassifier
15
from sklearn.tree import DecisionTreeClassifier
16
import annif.util
17
from annif.exception import NotInitializedException
18
from annif.suggestion import VectorSuggestionResult
19
from annif.lexical.tokenset import TokenSet, TokenSetIndex
20
from . import backend
21
from . import hyperopt
22
23
Term = collections.namedtuple('Term', 'subject_id label is_pref')
24
Match = collections.namedtuple(
25
    'Match', 'subject_id is_pref n_tokens pos ambiguity')
26
Candidate = collections.namedtuple(
27
    'Candidate',
28
    'doc_length subject_id freq is_pref n_tokens ambiguity ' +
29
    'first_occ last_occ spread')
30
31
Feature = IntEnum(
32
    'Feature',
33
    'freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity ' +
34
    'first_occ last_occ spread doc_length ' +
35
    'broader narrower related',
36
    start=0)
37
38
39
class MLLMModel:
40
    """Maui-like Lexical Matching model"""
41
42
    def _conflate_matches(self, matches, doc_length):
43
        subj_matches = collections.defaultdict(list)
44
        for match in matches:
45
            subj_matches[match.subject_id].append(match)
46
        return [
47
            Candidate(
48
                doc_length=doc_length,
49
                subject_id=subject_id,
50
                freq=len(matches) / doc_length,
51
                is_pref=mean((float(m.is_pref) for m in matches)),
52
                n_tokens=mean((m.n_tokens for m in matches)),
53
                ambiguity=mean((m.ambiguity for m in matches)),
54
                first_occ=matches[0].pos / doc_length,
55
                last_occ=matches[-1].pos / doc_length,
56
                spread=(matches[-1].pos - matches[0].pos) / doc_length
57
            )
58
            for subject_id, matches in subj_matches.items()]
59
60
    def generate_candidates(self, text, analyzer):
61
        sentences = analyzer.tokenize_sentences(text)
62
        sent_tokens = self._vectorizer.transform(sentences)
63
        matches = []
64
65
        for sent_idx, token_matrix in enumerate(sent_tokens):
66
            tset = TokenSet(token_matrix.nonzero()[1])
67
            for ts, ambiguity in self._index.search(tset):
68
                matches.append(Match(subject_id=ts.subject_id,
69
                                     is_pref=ts.is_pref,
70
                                     n_tokens=len(ts),
71
                                     pos=sent_idx,
72
                                     ambiguity=ambiguity))
73
74
        return self._conflate_matches(matches, len(sentences))
75
76
    def _candidates_to_features(self, candidates):
77
        """Convert a list of Candidates to a NumPy feature matrix"""
78
        matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
79
        c_ids = [c.subject_id for c in candidates]
80
        c_vec = np.zeros(self._related_matrix.shape[0], dtype=np.bool)
81
        c_vec[c_ids] = True
82
        broader = self._broader_matrix.multiply(c_vec).sum(axis=1)
83
        narrower = self._narrower_matrix.multiply(c_vec).sum(axis=1)
84
        related = self._related_matrix.multiply(c_vec).sum(axis=1)
85
        for idx, c in enumerate(candidates):
86
            subj = c.subject_id
87
            matrix[idx, Feature.freq] = c.freq
88
            matrix[idx, Feature.doc_freq] = self._doc_freq[subj]
89
            matrix[idx, Feature.subj_freq] = self._subj_freq.get(subj, 1) - 1
90
            matrix[idx, Feature.tfidf] = c.freq * self._idf[subj]
91
            matrix[idx, Feature.is_pref] = c.is_pref
92
            matrix[idx, Feature.n_tokens] = c.n_tokens
93
            matrix[idx, Feature.ambiguity] = c.ambiguity
94
            matrix[idx, Feature.first_occ] = c.first_occ
95
            matrix[idx, Feature.last_occ] = c.last_occ
96
            matrix[idx, Feature.spread] = c.spread
97
            matrix[idx, Feature.doc_length] = c.doc_length
98
            matrix[idx, Feature.broader] = broader[subj, 0] / len(c_ids)
99
            matrix[idx, Feature.narrower] = narrower[subj, 0] / len(c_ids)
100
            matrix[idx, Feature.related] = related[subj, 0] / len(c_ids)
101
        return matrix
102
103
    def _prepare_terms(self, graph, vocab, params):
104
        terms = []
105
        subject_ids = []
106
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
107
            if pref is None:
108
                continue  # deprecated subject
109
            subject_ids.append(subj_id)
110
            terms.append(Term(subject_id=subj_id, label=pref, is_pref=True))
111
112
            if annif.util.boolean(params['use_hidden_labels']):
113
                label_props = [SKOS.altLabel, SKOS.hiddenLabel]
114
            else:
115
                label_props = [SKOS.altLabel]
116
117
            for prop in label_props:
118
                for label in graph.objects(URIRef(uri), prop):
119
                    if label.language != params['language']:
120
                        continue
121
                    terms.append(Term(subject_id=subj_id,
122
                                      label=str(label),
123
                                      is_pref=False))
124
        return (terms, subject_ids)
125
126
    def _prepare_relations(self, graph, vocab):
127
        n_subj = len(vocab.subjects)
128
        self._broader_matrix = lil_matrix((n_subj, n_subj), dtype=np.bool)
129
        self._narrower_matrix = lil_matrix((n_subj, n_subj), dtype=np.bool)
130
        self._related_matrix = lil_matrix((n_subj, n_subj), dtype=np.bool)
131
132
        prop_matrix = [
133
            (SKOS.broader, self._broader_matrix),
134
            (SKOS.narrower, self._narrower_matrix),
135
            (SKOS.related, self._related_matrix)
136
        ]
137
138
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
139
            if pref is None:
140
                continue  # deprecated subject
141
142
            for prop, matrix in prop_matrix:
143
                for other in graph.objects(URIRef(uri), prop):
144
                    other_id = vocab.subjects.by_uri(str(other),
145
                                                     warnings=False)
146
                    if other_id is not None:
147
                        matrix[subj_id, other_id] = True
148
149
    def _prepare_train_index(self, vocab, analyzer, params):
150
        graph = vocab.as_graph()
151
        terms, subject_ids = self._prepare_terms(graph, vocab, params)
152
        self._prepare_relations(graph, vocab)
153
154
        self._vectorizer = CountVectorizer(
155
            binary=True,
156
            tokenizer=analyzer.tokenize_words
157
        )
158
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
159
160
        self._index = TokenSetIndex()
161
        for term, label_matrix in zip(terms, label_corpus):
162
            tokens = label_matrix.nonzero()[1]
163
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
164
            self._index.add(tset)
165
166
        return subject_ids
167
168
    def prepare_train(self, corpus, vocab, analyzer, params):
169
        subject_ids = self._prepare_train_index(vocab, analyzer, params)
170
171
        # frequency of subjects (by id) in the generated candidates
172
        self._doc_freq = collections.Counter()
173
        # frequency of manually assigned subjects ("domain keyphraseness")
174
        self._subj_freq = collections.Counter()
175
        doc_count = 0
176
        train_x = []
177
        train_y = []
178
        for idx, doc in enumerate(corpus.documents):
179
            doc_subject_ids = [vocab.subjects.by_uri(uri)
180
                               for uri in doc.uris]
181
            self._subj_freq.update(doc_subject_ids)
182
            candidates = self.generate_candidates(doc.text, analyzer)
183
            self._doc_freq.update([c.subject_id for c in candidates])
184
            train_x.append(candidates)
185
            train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
186
            doc_count += 1
187
188
        # precalculate idf values for candidate subjects
189
        self._idf = collections.defaultdict(float)
190
        for subj_id in subject_ids:
191
            self._idf[subj_id] = math.log((doc_count + 1) /
192
                                          (self._doc_freq[subj_id] + 1)) + 1
193
        return (np.vstack([self._candidates_to_features(candidates)
194
                           for candidates in train_x]), np.array(train_y))
195
196
    def _create_classifier(self, params):
197
        return BaggingClassifier(
198
            DecisionTreeClassifier(
199
                min_samples_leaf=int(params['min_samples_leaf']),
200
                max_leaf_nodes=int(params['max_leaf_nodes'])
201
            ), max_samples=float(params['max_samples']))
202
203
    def train(self, train_x, train_y, params):
204
        # fit the model on the training corpus
205
        self._classifier = self._create_classifier(params)
206
        self._classifier.fit(train_x, train_y)
207
208
    def _prediction_to_list(self, scores, candidates):
209
        subj_scores = [(score[1], c.subject_id)
210
                       for score, c in zip(scores, candidates)]
211
        return sorted(subj_scores, reverse=True)
212
213
    def predict(self, candidates):
214
        if not candidates:
215
            return []
216
        features = self._candidates_to_features(candidates)
217
        scores = self._classifier.predict_proba(features)
218
        return self._prediction_to_list(scores, candidates)
219
220
221
class MLLMOptimizer(hyperopt.HyperparameterOptimizer):
222
    """Hyperparameter optimizer for the MLLM backend"""
223
224
    def _prepare(self, n_jobs=1):
225
        self._backend.initialize()
226
        self._train_x, self._train_y = self._backend._load_train_data()
227
        self._candidates = []
228
        self._gold_subjects = []
229
230
        # TODO parallelize generation of candidates
231
        for doc in self._corpus.documents:
232
            candidates = self._backend._generate_candidates(doc.text)
233
            self._candidates.append(candidates)
234
            self._gold_subjects.append(
235
                annif.corpus.SubjectSet((doc.uris, doc.labels)))
236
237
    def _objective(self, trial):
238
        params = {
239
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 30),
240
            'max_leaf_nodes': trial.suggest_int('max_leaf_nodes', 100, 2000),
241
            'max_samples': trial.suggest_float('max_samples', 0.5, 1.0),
242
            'use_hidden_labels':
243
                trial.suggest_categorical('use_hidden_labels', [True, False]),
244
            'limit': 100
245
        }
246
        model = self._backend._model._create_classifier(params)
247
        model.fit(self._train_x, self._train_y)
248
249
        batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
250
        for goldsubj, candidates in zip(self._gold_subjects, self._candidates):
251
            if candidates:
252
                features = \
253
                    self._backend._model._candidates_to_features(candidates)
254
                scores = model.predict_proba(features)
255
                ranking = self._backend._model._prediction_to_list(
256
                    scores, candidates)
257
            else:
258
                ranking = []
259
            results = self._backend._prediction_to_result(ranking, params)
260
            batch.evaluate(results, goldsubj)
261
        results = batch.results(metrics=[self._metric])
262
        return results[self._metric]
263
264
    def _postprocess(self, study):
265
        bp = study.best_params
266
        lines = [
267
            f"min_samples_leaf={bp['min_samples_leaf']}",
268
            f"max_leaf_nodes={bp['max_leaf_nodes']}",
269
            f"max_samples={bp['max_samples']:.4f}",
270
            f"use_hidden_labels={bp['use_hidden_labels']}"
271
        ]
272
        return hyperopt.HPRecommendation(lines=lines, score=study.best_value)
273
274
275
class MLLMBackend(hyperopt.AnnifHyperoptBackend):
276
    """Maui-like Lexical Matching backend for Annif"""
277
    name = "mllm"
278
    needs_subject_index = True
279
280
    # defaults for unitialized instances
281
    _model = None
282
283
    MODEL_FILE = 'mllm-model.gz'
284
    TRAIN_FILE = 'mllm-train.gz'
285
286
    DEFAULT_PARAMETERS = {
287
        'min_samples_leaf': 20,
288
        'max_leaf_nodes': 1000,
289
        'max_samples': 0.9,
290
        'use_hidden_labels': False
291
    }
292
293
    def get_hp_optimizer(self, corpus, metric):
294
        return MLLMOptimizer(self, corpus, metric)
295
296
    def default_params(self):
297
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
298
        params.update(self.DEFAULT_PARAMETERS)
299
        return params
300
301
    def _load_model(self):
302
        path = os.path.join(self.datadir, self.MODEL_FILE)
303
        self.debug('loading model from {}'.format(path))
304
        if os.path.exists(path):
305
            return joblib.load(path)
306
        else:
307
            raise NotInitializedException(
308
                'model {} not found'.format(path),
309
                backend_id=self.backend_id)
310
311
    def _load_train_data(self):
312
        path = os.path.join(self.datadir, self.TRAIN_FILE)
313
        if os.path.exists(path):
314
            return joblib.load(path)
315
        else:
316
            raise NotInitializedException(
317
                'train data file {} not found'.format(path),
318
                backend_id=self.backend_id)
319
320
    def initialize(self):
321
        if self._model is None:
322
            self._model = self._load_model()
323
324
    def _train(self, corpus, params):
325
        self.info('starting train')
326
        if corpus != 'cached':
327
            self.info("preparing training data")
328
            self._model = MLLMModel()
329
            train_data = self._model.prepare_train(corpus,
330
                                                   self.project.vocab,
331
                                                   self.project.analyzer,
332
                                                   params)
333
            annif.util.atomic_save(train_data,
334
                                   self.datadir,
335
                                   self.TRAIN_FILE,
336
                                   method=joblib.dump)
337
        else:
338
            self.info("reusing cached training data from previous run")
339
            self._model = self._load_model()
340
            train_data = self._load_train_data()
341
342
        self.info("training model")
343
        self._model.train(train_data[0], train_data[1], params)
344
345
        self.info('saving model')
346
        annif.util.atomic_save(
347
            self._model,
348
            self.datadir,
349
            self.MODEL_FILE,
350
            method=joblib.dump)
351
352
    def _generate_candidates(self, text):
353
        return self._model.generate_candidates(text, self.project.analyzer)
354
355
    def _prediction_to_result(self, prediction, params):
356
        vector = np.zeros(len(self.project.subjects), dtype=np.float32)
357
        for score, subject_id in prediction:
358
            vector[subject_id] = score
359
        result = VectorSuggestionResult(vector)
360
        return result.filter(self.project.subjects,
361
                             limit=int(params['limit']))
362
363
    def _suggest(self, text, params):
364
        candidates = self._generate_candidates(text)
365
        prediction = self._model.predict(candidates)
366
        return self._prediction_to_result(prediction, params)
367