Passed
Pull Request — master (#462)
by Osma
01:53
created

annif.backend.mllm.MLLMBackend._train()   A

Complexity

Conditions 2

Size

Total Lines 27
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 27
rs 9.304
c 0
b 0
f 0
cc 2
nop 3
1
"""Maui-like Lexical Matching backend"""
2
3
import collections
4
import math
5
from enum import IntEnum
6
from statistics import mean
7
import os.path
8
import joblib
9
import numpy as np
10
from rdflib import URIRef
11
from rdflib.namespace import SKOS
12
from sklearn.feature_extraction.text import CountVectorizer
13
from sklearn.pipeline import Pipeline
14
from sklearn.preprocessing import FunctionTransformer
15
from sklearn.ensemble import BaggingClassifier
16
from sklearn.tree import DecisionTreeClassifier
17
import annif.util
18
from annif.exception import NotInitializedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
from . import hyperopt
22
23
Term = collections.namedtuple('Term', 'subject_id label is_pref')
24
Match = collections.namedtuple(
25
    'Match', 'subject_id is_pref n_tokens pos ambiguity')
26
Candidate = collections.namedtuple(
27
    'Candidate',
28
    'doc_length subject_id freq is_pref n_tokens ambiguity ' +
29
    'first_occ last_occ spread')
30
31
Feature = IntEnum(
32
    'Feature',
33
    'freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity ' +
34
    'first_occ last_occ spread doc_length',
35
    start=0)
36
37
38
class TokenSet:
39
    """Represents a set of tokens (expressed as integer token IDs) that can
40
    be matched with another set of tokens. A TokenSet can optionally
41
    be associated with a subject from the vocabulary."""
42
43
    def __init__(self, tokens, subject_id=None, is_pref=False):
44
        self._tokens = set(tokens)
45
        self.subject_id = subject_id
46
        self.is_pref = is_pref
47
48
    def __len__(self):
49
        return len(self._tokens)
50
51
    def __iter__(self):
52
        return iter(self._tokens)
53
54
    def contains(self, other):
55
        """Returns True iff the tokens in the other TokenSet are all
56
        included within this TokenSet."""
57
58
        return other._tokens.issubset(self._tokens)
59
60
    def sample(self):
61
        """Return an arbitrary token from this TokenSet, or None if empty"""
62
        try:
63
            return next(iter(self._tokens))
64
        except StopIteration:
65
            return None
66
67
68
class TokenSetIndex:
69
    """A searchable index of TokenSets (representing vocabulary terms)"""
70
71
    def __init__(self):
72
        self._index = collections.defaultdict(set)
73
74
    def __len__(self):
75
        return len(self._index)
76
77
    def add(self, tset):
78
        """Add a TokenSet into this index"""
79
        token = tset.sample()
80
        if token is not None:
81
            self._index[token].add(tset)
82
83
    def search(self, tset):
84
        """Return the TokenSets that are contained in the given TokenSet.
85
        The matches are returned as a list of (TokenSet, ambiguity) pairs
86
        where ambiguity is an integer indicating the number of other TokenSets
87
        that also match the same tokens."""
88
89
        subj_tsets = {}
90
        subj_ambiguity = collections.Counter()
91
92
        for token in tset:
93
            for ts in self._index[token]:
94
                if not tset.contains(ts):
95
                    continue
96
                if ts.subject_id not in subj_tsets or \
97
                   not subj_tsets[ts.subject_id].is_pref:
98
                    subj_tsets[ts.subject_id] = ts
99
100
        for ts in subj_tsets.values():
101
            for other in subj_tsets.values():
102
                if ts == other:
103
                    continue
104
                if other.contains(ts):
105
                    subj_ambiguity.update([ts.subject_id])
106
107
        return [(ts, subj_ambiguity[ts.subject_id])
108
                for uri, ts in subj_tsets.items()]
109
110
111
class MLLMModel:
112
    """Maui-like Lexical Matching model"""
113
114
    def _conflate_matches(self, matches, doc_length):
115
        subj_matches = collections.defaultdict(list)
116
        for match in matches:
117
            subj_matches[match.subject_id].append(match)
118
        return [
119
            Candidate(
120
                doc_length=doc_length,
121
                subject_id=subject_id,
122
                freq=len(matches) / doc_length,
123
                is_pref=mean((float(m.is_pref) for m in matches)),
124
                n_tokens=mean((m.n_tokens for m in matches)),
125
                ambiguity=mean((m.ambiguity for m in matches)),
126
                first_occ=matches[0].pos / doc_length,
127
                last_occ=matches[-1].pos / doc_length,
128
                spread=(matches[-1].pos - matches[0].pos) / doc_length
129
            )
130
            for subject_id, matches in subj_matches.items()]
131
132
    def generate_candidates(self, text, analyzer):
133
        sentences = analyzer.tokenize_sentences(text)
134
        sent_tokens = self._vectorizer.transform(sentences)
135
        matches = []
136
137
        for sent_idx, token_matrix in enumerate(sent_tokens):
138
            tset = TokenSet(token_matrix.nonzero()[1])
139
            for ts, ambiguity in self._index.search(tset):
140
                matches.append(Match(subject_id=ts.subject_id,
141
                                     is_pref=ts.is_pref,
142
                                     n_tokens=len(ts),
143
                                     pos=sent_idx,
144
                                     ambiguity=ambiguity))
145
146
        return self._conflate_matches(matches, len(sentences))
147
148
    def _candidates_to_features(self, candidates):
149
        """Convert a list of Candidates to a NumPy feature matrix"""
150
        matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
151
        for idx, c in enumerate(candidates):
152
            subj = c.subject_id
153
            matrix[idx, Feature.freq] = c.freq
154
            matrix[idx, Feature.doc_freq] = self._doc_freq[subj]
155
            matrix[idx, Feature.subj_freq] = self._subj_freq.get(subj, 1) - 1
156
            matrix[idx, Feature.tfidf] = c.freq * self._idf[subj]
157
            matrix[idx, Feature.is_pref] = c.is_pref
158
            matrix[idx, Feature.n_tokens] = c.n_tokens
159
            matrix[idx, Feature.ambiguity] = c.ambiguity
160
            matrix[idx, Feature.first_occ] = c.first_occ
161
            matrix[idx, Feature.last_occ] = c.last_occ
162
            matrix[idx, Feature.spread] = c.spread
163
            matrix[idx, Feature.doc_length] = c.doc_length
164
        return matrix
165
166
    def prepare_train(self, corpus, vocab, analyzer, params):
167
        graph = vocab.as_graph()
168
        terms = []
169
        subject_ids = []
170
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
171
            if pref is None:
172
                continue  # deprecated subject
173
            subject_ids.append(subj_id)
174
            terms.append(Term(subject_id=subj_id, label=pref, is_pref=True))
175
            alts = graph.preferredLabel(URIRef(uri),
176
                                        lang=params['language'],
177
                                        labelProperties=[SKOS.altLabel])
178
            for label, _ in alts:
179
                terms.append(Term(subject_id=subj_id,
180
                                  label=str(label),
181
                                  is_pref=False))
182
183
        self._vectorizer = CountVectorizer(
184
            binary=True,
185
            tokenizer=analyzer.tokenize_words
186
        )
187
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
188
189
        self._index = TokenSetIndex()
190
        for term, label_matrix in zip(terms, label_corpus):
191
            tokens = label_matrix.nonzero()[1]
192
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
193
            self._index.add(tset)
194
195
        # frequency of subjects (by id) in the generated candidates
196
        self._doc_freq = collections.Counter()
197
        # frequency of manually assigned subjects ("domain keyphraseness")
198
        self._subj_freq = collections.Counter()
199
        doc_count = 0
200
        train_X = []
201
        train_y = []
202
        for idx, doc in enumerate(corpus.documents):
203
            doc_subject_ids = [vocab.subjects.by_uri(uri)
204
                               for uri in doc.uris]
205
            self._subj_freq.update(doc_subject_ids)
206
            candidates = self.generate_candidates(doc.text, analyzer)
207
            self._doc_freq.update([c.subject_id for c in candidates])
208
            train_X += candidates
209
            train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
210
            doc_count += 1
211
212
        # precalculate idf values for candidate subjects
213
        self._idf = collections.defaultdict(float)
214
        for subj_id in subject_ids:
215
            self._idf[uri] = math.log((doc_count + 1) /
216
                                      (self._doc_freq[subj_id] + 1)) + 1
217
        return (train_X, train_y)
218
219
    def _create_classifier(self, params):
220
        # define a sklearn pipeline with transformer and classifier
221
        return Pipeline(
222
            steps=[
223
                ('transformer', FunctionTransformer(
224
                    self._candidates_to_features)),
225
                ('classifier', BaggingClassifier(
226
                    DecisionTreeClassifier(
227
                        min_samples_leaf=int(params['min_samples_leaf']),
228
                        max_leaf_nodes=int(params['max_leaf_nodes'])
229
                    ), max_samples=float(params['max_samples'])))])
230
231
    def train(self, train_X, train_y, params):
232
        # fit the model on the training corpus
233
        self._classifier = self._create_classifier(params)
234
        self._classifier.fit(train_X, train_y)
235
236
    def _prediction_to_list(self, scores, candidates):
237
        subj_scores = [(score[1], c.subject_id)
238
                       for score, c in zip(scores, candidates)]
239
        return sorted(subj_scores, reverse=True)
240
241
    def predict(self, candidates):
242
        if not candidates:
243
            return []
244
        scores = self._classifier.predict_proba(candidates)
245
        return self._prediction_to_list(scores, candidates)
246
247
248
class MLLMOptimizer(hyperopt.HyperparameterOptimizer):
249
    """Hyperparameter optimizer for the MLLM backend"""
250
251
    def _prepare(self, n_jobs=1):
252
        self._backend.initialize()
253
        self._train_X, self._train_y = self._backend._load_train_data()
254
        self._candidates = []
255
        self._gold_subjects = []
256
257
        # TODO parallelize generation of candidates
258
        for doc in self._corpus.documents:
259
            candidates = self._backend._generate_candidates(doc.text)
260
            self._candidates.append(candidates)
261
            self._gold_subjects.append(
262
                annif.corpus.SubjectSet((doc.uris, doc.labels)))
263
264
    def _objective(self, trial):
265
        params = {
266
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 30),
267
            'max_leaf_nodes': trial.suggest_int('max_leaf_nodes', 100, 2000),
268
            'max_samples': trial.suggest_float('max_samples', 0.5, 1.0),
269
            'limit': 100
270
        }
271
        model = self._backend._model._create_classifier(params)
272
        model.fit(self._train_X, self._train_y)
273
274
        batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
275
        for goldsubj, candidates in zip(self._gold_subjects, self._candidates):
276
            scores = model.predict_proba(candidates)
277
            ranking = self._backend._model._prediction_to_list(
278
                scores, candidates)
279
            results = self._backend._prediction_to_result(ranking, params)
280
            batch.evaluate(results, goldsubj)
281
        results = batch.results(metrics=[self._metric])
282
        return results[self._metric]
283
284
    def _postprocess(self, study):
285
        bp = study.best_params
286
        lines = [
287
            f"min_samples_leaf={bp['min_samples_leaf']}",
288
            f"max_leaf_nodes={bp['max_leaf_nodes']}",
289
            f"max_samples={bp['max_samples']:.4f}"
290
        ]
291
        return hyperopt.HPRecommendation(lines=lines, score=study.best_value)
292
293
294
class MLLMBackend(hyperopt.AnnifHyperoptBackend):
295
    """Maui-like Lexical Matching backend for Annif"""
296
    name = "mllm"
297
    needs_subject_index = True
298
299
    # defaults for unitialized instances
300
    _model = None
301
302
    MODEL_FILE = 'mllm-model.gz'
303
    TRAIN_FILE = 'mllm-train.gz'
304
305
    DEFAULT_PARAMETERS = {
306
        'min_samples_leaf': 20,
307
        'max_leaf_nodes': 1000,
308
        'max_samples': 0.9
309
    }
310
311
    def get_hp_optimizer(self, corpus, metric):
312
        return MLLMOptimizer(self, corpus, metric)
313
314
    def default_params(self):
315
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
316
        params.update(self.DEFAULT_PARAMETERS)
317
        return params
318
319
    def _load_model(self):
320
        path = os.path.join(self.datadir, self.MODEL_FILE)
321
        self.debug('loading model from {}'.format(path))
322
        if os.path.exists(path):
323
            return joblib.load(path)
324
        else:
325
            raise NotInitializedException(
326
                'model {} not found'.format(path),
327
                backend_id=self.backend_id)
328
329
    def _load_train_data(self):
330
        path = os.path.join(self.datadir, self.TRAIN_FILE)
331
        if os.path.exists(path):
332
            return joblib.load(path)
333
        else:
334
            raise NotInitializedException(
335
                'train data file {} not found'.format(path),
336
                backend_id=self.backend_id)
337
338
    def initialize(self):
339
        if self._model is None:
340
            self._model = self._load_model()
341
342
    def _train(self, corpus, params):
343
        self.info('starting train')
344
        if corpus != 'cached':
345
            self.info("preparing training data")
346
            self._model = MLLMModel()
347
            train_data = self._model.prepare_train(corpus,
348
                                                   self.project.vocab,
349
                                                   self.project.analyzer,
350
                                                   params)
351
            annif.util.atomic_save(train_data,
352
                                   self.datadir,
353
                                   self.TRAIN_FILE,
354
                                   method=joblib.dump)
355
        else:
356
            self.info("reusing cached training data from previous run")
357
            self._model = self._load_model()
358
            train_data = self._load_train_data()
359
360
        self.info("training model")
361
        self._model.train(train_data[0], train_data[1], params)
362
363
        self.info('saving model')
364
        annif.util.atomic_save(
365
            self._model,
366
            self.datadir,
367
            self.MODEL_FILE,
368
            method=joblib.dump)
369
370
    def _generate_candidates(self, text):
371
        return self._model.generate_candidates(text, self.project.analyzer)
372
373
    def _prediction_to_result(self, prediction, params):
374
        vector = np.zeros(len(self.project.subjects), dtype=np.float32)
375
        for score, subject_id in prediction:
376
            vector[subject_id] = score
377
        result = VectorSuggestionResult(vector)
378
        return result.filter(self.project.subjects,
379
                             limit=int(params['limit']))
380
381
    def _suggest(self, text, params):
382
        candidates = self._generate_candidates(text)
383
        prediction = self._model.predict(candidates)
384
        return self._prediction_to_result(prediction, params)
385