Passed
Pull Request — master (#462)
by Osma
02:10
created

annif.backend.mllm   F

Complexity

Total Complexity 68

Size/Duplication

Total Lines 419
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 322
dl 0
loc 419
rs 2.96
c 0
b 0
f 0
wmc 68

31 Methods

Rating   Name   Duplication   Size   Complexity  
C TokenSetIndex.search() 0 26 10
A TokenSet.__init__() 0 4 1
A TokenSet.sample() 0 6 2
A MLLMModel.generate_candidates() 0 15 3
A TokenSetIndex.add() 0 5 2
A TokenSet.__len__() 0 2 1
A TokenSet.contains() 0 5 1
A TokenSet.__iter__() 0 2 1
A TokenSetIndex.__len__() 0 2 1
A TokenSetIndex.__init__() 0 2 1
A MLLMModel._conflate_matches() 0 17 2
A MLLMModel._candidates_to_features() 0 22 2
A MLLMOptimizer._prepare() 0 12 2
A MLLMOptimizer._objective() 0 26 3
A MLLMBackend._load_train_data() 0 8 2
A MLLMBackend._generate_candidates() 0 2 1
A MLLMModel.prepare_train() 0 41 4
A MLLMBackend.initialize() 0 3 2
A MLLMBackend._train() 0 27 2
A MLLMBackend._load_model() 0 9 2
A MLLMBackend.default_params() 0 4 1
A MLLMOptimizer._postprocess() 0 9 1
A MLLMModel.train() 0 4 1
B MLLMModel._prepare_terms() 0 22 7
A MLLMModel._prepare_relations() 0 11 5
A MLLMModel._prediction_to_list() 0 4 1
A MLLMModel._create_classifier() 0 6 1
A MLLMBackend._prediction_to_result() 0 7 2
A MLLMModel.predict() 0 6 2
A MLLMBackend.get_hp_optimizer() 0 2 1
A MLLMBackend._suggest() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like annif.backend.mllm often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Maui-like Lexical Matching backend"""
2
3
import collections
4
import math
5
from enum import IntEnum
6
from statistics import mean
7
import os.path
8
import joblib
9
import numpy as np
10
from rdflib import URIRef
11
from rdflib.namespace import SKOS
12
from scipy.sparse import lil_matrix
13
from sklearn.feature_extraction.text import CountVectorizer
14
from sklearn.ensemble import BaggingClassifier
15
from sklearn.tree import DecisionTreeClassifier
16
import annif.util
17
from annif.exception import NotInitializedException
18
from annif.suggestion import VectorSuggestionResult
19
from . import backend
20
from . import hyperopt
21
22
Term = collections.namedtuple('Term', 'subject_id label is_pref')
23
Match = collections.namedtuple(
24
    'Match', 'subject_id is_pref n_tokens pos ambiguity')
25
Candidate = collections.namedtuple(
26
    'Candidate',
27
    'doc_length subject_id freq is_pref n_tokens ambiguity ' +
28
    'first_occ last_occ spread')
29
30
Feature = IntEnum(
31
    'Feature',
32
    'freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity ' +
33
    'first_occ last_occ spread doc_length ' +
34
    'related',
35
    start=0)
36
37
38
class TokenSet:
39
    """Represents a set of tokens (expressed as integer token IDs) that can
40
    be matched with another set of tokens. A TokenSet can optionally
41
    be associated with a subject from the vocabulary."""
42
43
    def __init__(self, tokens, subject_id=None, is_pref=False):
44
        self._tokens = set(tokens)
45
        self.subject_id = subject_id
46
        self.is_pref = is_pref
47
48
    def __len__(self):
49
        return len(self._tokens)
50
51
    def __iter__(self):
52
        return iter(self._tokens)
53
54
    def contains(self, other):
55
        """Returns True iff the tokens in the other TokenSet are all
56
        included within this TokenSet."""
57
58
        return other._tokens.issubset(self._tokens)
59
60
    def sample(self):
61
        """Return an arbitrary token from this TokenSet, or None if empty"""
62
        try:
63
            return next(iter(self._tokens))
64
        except StopIteration:
65
            return None
66
67
68
class TokenSetIndex:
69
    """A searchable index of TokenSets (representing vocabulary terms)"""
70
71
    def __init__(self):
72
        self._index = collections.defaultdict(set)
73
74
    def __len__(self):
75
        return len(self._index)
76
77
    def add(self, tset):
78
        """Add a TokenSet into this index"""
79
        token = tset.sample()
80
        if token is not None:
81
            self._index[token].add(tset)
82
83
    def search(self, tset):
84
        """Return the TokenSets that are contained in the given TokenSet.
85
        The matches are returned as a list of (TokenSet, ambiguity) pairs
86
        where ambiguity is an integer indicating the number of other TokenSets
87
        that also match the same tokens."""
88
89
        subj_tsets = {}
90
        subj_ambiguity = collections.Counter()
91
92
        for token in tset:
93
            for ts in self._index[token]:
94
                if not tset.contains(ts):
95
                    continue
96
                if ts.subject_id not in subj_tsets or \
97
                   not subj_tsets[ts.subject_id].is_pref:
98
                    subj_tsets[ts.subject_id] = ts
99
100
        for ts in subj_tsets.values():
101
            for other in subj_tsets.values():
102
                if ts == other:
103
                    continue
104
                if other.contains(ts):
105
                    subj_ambiguity.update([ts.subject_id])
106
107
        return [(ts, subj_ambiguity[ts.subject_id])
108
                for uri, ts in subj_tsets.items()]
109
110
111
class MLLMModel:
112
    """Maui-like Lexical Matching model"""
113
114
    def _conflate_matches(self, matches, doc_length):
115
        subj_matches = collections.defaultdict(list)
116
        for match in matches:
117
            subj_matches[match.subject_id].append(match)
118
        return [
119
            Candidate(
120
                doc_length=doc_length,
121
                subject_id=subject_id,
122
                freq=len(matches) / doc_length,
123
                is_pref=mean((float(m.is_pref) for m in matches)),
124
                n_tokens=mean((m.n_tokens for m in matches)),
125
                ambiguity=mean((m.ambiguity for m in matches)),
126
                first_occ=matches[0].pos / doc_length,
127
                last_occ=matches[-1].pos / doc_length,
128
                spread=(matches[-1].pos - matches[0].pos) / doc_length
129
            )
130
            for subject_id, matches in subj_matches.items()]
131
132
    def generate_candidates(self, text, analyzer):
133
        sentences = analyzer.tokenize_sentences(text)
134
        sent_tokens = self._vectorizer.transform(sentences)
135
        matches = []
136
137
        for sent_idx, token_matrix in enumerate(sent_tokens):
138
            tset = TokenSet(token_matrix.nonzero()[1])
139
            for ts, ambiguity in self._index.search(tset):
140
                matches.append(Match(subject_id=ts.subject_id,
141
                                     is_pref=ts.is_pref,
142
                                     n_tokens=len(ts),
143
                                     pos=sent_idx,
144
                                     ambiguity=ambiguity))
145
146
        return self._conflate_matches(matches, len(sentences))
147
148
    def _candidates_to_features(self, candidates):
149
        """Convert a list of Candidates to a NumPy feature matrix"""
150
        matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
151
        c_ids = [c.subject_id for c in candidates]
152
        c_vec = np.zeros(self._related_matrix.shape[0], dtype=np.bool)
153
        c_vec[c_ids] = True
154
        rels = self._related_matrix.multiply(c_vec).sum(axis=1)
155
        for idx, c in enumerate(candidates):
156
            subj = c.subject_id
157
            matrix[idx, Feature.freq] = c.freq
158
            matrix[idx, Feature.doc_freq] = self._doc_freq[subj]
159
            matrix[idx, Feature.subj_freq] = self._subj_freq.get(subj, 1) - 1
160
            matrix[idx, Feature.tfidf] = c.freq * self._idf[subj]
161
            matrix[idx, Feature.is_pref] = c.is_pref
162
            matrix[idx, Feature.n_tokens] = c.n_tokens
163
            matrix[idx, Feature.ambiguity] = c.ambiguity
164
            matrix[idx, Feature.first_occ] = c.first_occ
165
            matrix[idx, Feature.last_occ] = c.last_occ
166
            matrix[idx, Feature.spread] = c.spread
167
            matrix[idx, Feature.doc_length] = c.doc_length
168
            matrix[idx, Feature.related] = rels[subj, 0] / len(c_ids)
169
        return matrix
170
171
    def _prepare_terms(self, graph, vocab, params):
172
        terms = []
173
        subject_ids = []
174
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
175
            if pref is None:
176
                continue  # deprecated subject
177
            subject_ids.append(subj_id)
178
            terms.append(Term(subject_id=subj_id, label=pref, is_pref=True))
179
180
            if annif.util.boolean(params['use_hidden_labels']):
181
                label_props = [SKOS.altLabel, SKOS.hiddenLabel]
182
            else:
183
                label_props = [SKOS.altLabel]
184
185
            for prop in label_props:
186
                for label in graph.objects(URIRef(uri), prop):
187
                    if label.language != params['language']:
188
                        continue
189
                    terms.append(Term(subject_id=subj_id,
190
                                      label=str(label),
191
                                      is_pref=False))
192
        return (terms, subject_ids)
193
194
    def _prepare_relations(self, graph, vocab):
195
        n_subj = len(vocab.subjects)
196
        self._related_matrix = lil_matrix((n_subj, n_subj), dtype=np.bool)
197
198
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
199
            if pref is None:
200
                continue  # deprecated subject
201
            for related in graph.objects(URIRef(uri), SKOS.related):
202
                broad_id = vocab.subjects.by_uri(str(related), warnings=False)
203
                if broad_id is not None:
204
                    self._related_matrix[subj_id, broad_id] = True
205
206
    def prepare_train(self, corpus, vocab, analyzer, params):
207
        graph = vocab.as_graph()
208
        terms, subject_ids = self._prepare_terms(graph, vocab, params)
209
        self._prepare_relations(graph, vocab)
210
211
        self._vectorizer = CountVectorizer(
212
            binary=True,
213
            tokenizer=analyzer.tokenize_words
214
        )
215
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
216
217
        self._index = TokenSetIndex()
218
        for term, label_matrix in zip(terms, label_corpus):
219
            tokens = label_matrix.nonzero()[1]
220
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
221
            self._index.add(tset)
222
223
        # frequency of subjects (by id) in the generated candidates
224
        self._doc_freq = collections.Counter()
225
        # frequency of manually assigned subjects ("domain keyphraseness")
226
        self._subj_freq = collections.Counter()
227
        doc_count = 0
228
        train_x = []
229
        train_y = []
230
        for idx, doc in enumerate(corpus.documents):
231
            doc_subject_ids = [vocab.subjects.by_uri(uri)
232
                               for uri in doc.uris]
233
            self._subj_freq.update(doc_subject_ids)
234
            candidates = self.generate_candidates(doc.text, analyzer)
235
            self._doc_freq.update([c.subject_id for c in candidates])
236
            train_x.append(candidates)
237
            train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
238
            doc_count += 1
239
240
        # precalculate idf values for candidate subjects
241
        self._idf = collections.defaultdict(float)
242
        for subj_id in subject_ids:
243
            self._idf[subj_id] = math.log((doc_count + 1) /
244
                                          (self._doc_freq[subj_id] + 1)) + 1
245
        return (np.vstack([self._candidates_to_features(candidates)
246
                           for candidates in train_x]), np.array(train_y))
247
248
    def _create_classifier(self, params):
249
        return BaggingClassifier(
250
            DecisionTreeClassifier(
251
                min_samples_leaf=int(params['min_samples_leaf']),
252
                max_leaf_nodes=int(params['max_leaf_nodes'])
253
            ), max_samples=float(params['max_samples']))
254
255
    def train(self, train_x, train_y, params):
256
        # fit the model on the training corpus
257
        self._classifier = self._create_classifier(params)
258
        self._classifier.fit(train_x, train_y)
259
260
    def _prediction_to_list(self, scores, candidates):
261
        subj_scores = [(score[1], c.subject_id)
262
                       for score, c in zip(scores, candidates)]
263
        return sorted(subj_scores, reverse=True)
264
265
    def predict(self, candidates):
266
        if not candidates:
267
            return []
268
        features = self._candidates_to_features(candidates)
269
        scores = self._classifier.predict_proba(features)
270
        return self._prediction_to_list(scores, candidates)
271
272
273
class MLLMOptimizer(hyperopt.HyperparameterOptimizer):
274
    """Hyperparameter optimizer for the MLLM backend"""
275
276
    def _prepare(self, n_jobs=1):
277
        self._backend.initialize()
278
        self._train_x, self._train_y = self._backend._load_train_data()
279
        self._candidates = []
280
        self._gold_subjects = []
281
282
        # TODO parallelize generation of candidates
283
        for doc in self._corpus.documents:
284
            candidates = self._backend._generate_candidates(doc.text)
285
            self._candidates.append(candidates)
286
            self._gold_subjects.append(
287
                annif.corpus.SubjectSet((doc.uris, doc.labels)))
288
289
    def _objective(self, trial):
290
        params = {
291
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 30),
292
            'max_leaf_nodes': trial.suggest_int('max_leaf_nodes', 100, 2000),
293
            'max_samples': trial.suggest_float('max_samples', 0.5, 1.0),
294
            'use_hidden_labels':
295
                trial.suggest_categorical('use_hidden_labels', [True, False]),
296
            'limit': 100
297
        }
298
        model = self._backend._model._create_classifier(params)
299
        model.fit(self._train_x, self._train_y)
300
301
        batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
302
        for goldsubj, candidates in zip(self._gold_subjects, self._candidates):
303
            if candidates:
304
                features = \
305
                    self._backend._model._candidates_to_features(candidates)
306
                scores = model.predict_proba(features)
307
                ranking = self._backend._model._prediction_to_list(
308
                    scores, candidates)
309
            else:
310
                ranking = []
311
            results = self._backend._prediction_to_result(ranking, params)
312
            batch.evaluate(results, goldsubj)
313
        results = batch.results(metrics=[self._metric])
314
        return results[self._metric]
315
316
    def _postprocess(self, study):
317
        bp = study.best_params
318
        lines = [
319
            f"min_samples_leaf={bp['min_samples_leaf']}",
320
            f"max_leaf_nodes={bp['max_leaf_nodes']}",
321
            f"max_samples={bp['max_samples']:.4f}",
322
            f"use_hidden_labels={bp['use_hidden_labels']}"
323
        ]
324
        return hyperopt.HPRecommendation(lines=lines, score=study.best_value)
325
326
327
class MLLMBackend(hyperopt.AnnifHyperoptBackend):
328
    """Maui-like Lexical Matching backend for Annif"""
329
    name = "mllm"
330
    needs_subject_index = True
331
332
    # defaults for unitialized instances
333
    _model = None
334
335
    MODEL_FILE = 'mllm-model.gz'
336
    TRAIN_FILE = 'mllm-train.gz'
337
338
    DEFAULT_PARAMETERS = {
339
        'min_samples_leaf': 20,
340
        'max_leaf_nodes': 1000,
341
        'max_samples': 0.9,
342
        'use_hidden_labels': False
343
    }
344
345
    def get_hp_optimizer(self, corpus, metric):
346
        return MLLMOptimizer(self, corpus, metric)
347
348
    def default_params(self):
349
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
350
        params.update(self.DEFAULT_PARAMETERS)
351
        return params
352
353
    def _load_model(self):
354
        path = os.path.join(self.datadir, self.MODEL_FILE)
355
        self.debug('loading model from {}'.format(path))
356
        if os.path.exists(path):
357
            return joblib.load(path)
358
        else:
359
            raise NotInitializedException(
360
                'model {} not found'.format(path),
361
                backend_id=self.backend_id)
362
363
    def _load_train_data(self):
364
        path = os.path.join(self.datadir, self.TRAIN_FILE)
365
        if os.path.exists(path):
366
            return joblib.load(path)
367
        else:
368
            raise NotInitializedException(
369
                'train data file {} not found'.format(path),
370
                backend_id=self.backend_id)
371
372
    def initialize(self):
373
        if self._model is None:
374
            self._model = self._load_model()
375
376
    def _train(self, corpus, params):
377
        self.info('starting train')
378
        if corpus != 'cached':
379
            self.info("preparing training data")
380
            self._model = MLLMModel()
381
            train_data = self._model.prepare_train(corpus,
382
                                                   self.project.vocab,
383
                                                   self.project.analyzer,
384
                                                   params)
385
            annif.util.atomic_save(train_data,
386
                                   self.datadir,
387
                                   self.TRAIN_FILE,
388
                                   method=joblib.dump)
389
        else:
390
            self.info("reusing cached training data from previous run")
391
            self._model = self._load_model()
392
            train_data = self._load_train_data()
393
394
        self.info("training model")
395
        self._model.train(train_data[0], train_data[1], params)
396
397
        self.info('saving model')
398
        annif.util.atomic_save(
399
            self._model,
400
            self.datadir,
401
            self.MODEL_FILE,
402
            method=joblib.dump)
403
404
    def _generate_candidates(self, text):
405
        return self._model.generate_candidates(text, self.project.analyzer)
406
407
    def _prediction_to_result(self, prediction, params):
408
        vector = np.zeros(len(self.project.subjects), dtype=np.float32)
409
        for score, subject_id in prediction:
410
            vector[subject_id] = score
411
        result = VectorSuggestionResult(vector)
412
        return result.filter(self.project.subjects,
413
                             limit=int(params['limit']))
414
415
    def _suggest(self, text, params):
416
        candidates = self._generate_candidates(text)
417
        prediction = self._model.predict(candidates)
418
        return self._prediction_to_result(prediction, params)
419