Passed
Pull Request — master (#462)
by Osma
01:48
created

annif.backend.mllm.MLLMBackend._load_model()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""Maui-like Lexical Matching backend"""
2
3
import collections
4
import math
5
from enum import IntEnum
6
from statistics import mean
7
import os.path
8
import joblib
9
import numpy as np
10
from rdflib import URIRef
11
from rdflib.namespace import SKOS
12
from sklearn.feature_extraction.text import CountVectorizer
13
from sklearn.pipeline import Pipeline
14
from sklearn.preprocessing import FunctionTransformer
15
from sklearn.ensemble import BaggingClassifier
16
from sklearn.tree import DecisionTreeClassifier
17
import annif.util
18
from annif.exception import NotInitializedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
22
Term = collections.namedtuple('Term', 'subject_id label is_pref')
23
Match = collections.namedtuple(
24
    'Match', 'subject_id is_pref n_tokens pos ambiguity')
25
Candidate = collections.namedtuple(
26
    'Candidate',
27
    'doc_length subject_id freq is_pref n_tokens ambiguity ' +
28
    'first_occ last_occ spread')
29
30
Feature = IntEnum(
31
    'Feature',
32
    'freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity ' +
33
    'first_occ last_occ spread doc_length',
34
    start=0)
35
36
37
class TokenSet:
38
    """Represents a set of tokens (expressed as integer token IDs) that can
39
    be matched with another set of tokens. A TokenSet can optionally
40
    be associated with a subject from the vocabulary."""
41
42
    def __init__(self, tokens, subject_id=None, is_pref=False):
43
        self._tokens = set(tokens)
44
        self.subject_id = subject_id
45
        self.is_pref = is_pref
46
47
    def __len__(self):
48
        return len(self._tokens)
49
50
    def __iter__(self):
51
        return iter(self._tokens)
52
53
    def contains(self, other):
54
        """Returns True iff the tokens in the other TokenSet are all
55
        included within this TokenSet."""
56
57
        return other._tokens.issubset(self._tokens)
58
59
    def sample(self):
60
        """Return an arbitrary token from this TokenSet, or None if empty"""
61
        try:
62
            return next(iter(self._tokens))
63
        except StopIteration:
64
            return None
65
66
67
class TokenSetIndex:
68
    """A searchable index of TokenSets (representing vocabulary terms)"""
69
70
    def __init__(self):
71
        self._index = collections.defaultdict(set)
72
73
    def __len__(self):
74
        return len(self._index)
75
76
    def add(self, tset):
77
        """Add a TokenSet into this index"""
78
        token = tset.sample()
79
        if token is not None:
80
            self._index[token].add(tset)
81
82
    def search(self, tset):
83
        """Return the TokenSets that are contained in the given TokenSet.
84
        The matches are returned as a list of (TokenSet, ambiguity) pairs
85
        where ambiguity is an integer indicating the number of other TokenSets
86
        that also match the same tokens."""
87
88
        subj_tsets = {}
89
        subj_ambiguity = collections.Counter()
90
91
        for token in tset:
92
            for ts in self._index[token]:
93
                if not tset.contains(ts):
94
                    continue
95
                if ts.subject_id not in subj_tsets or \
96
                   not subj_tsets[ts.subject_id].is_pref:
97
                    subj_tsets[ts.subject_id] = ts
98
99
        for ts in subj_tsets.values():
100
            for other in subj_tsets.values():
101
                if ts == other:
102
                    continue
103
                if other.contains(ts):
104
                    subj_ambiguity.update([ts.subject_id])
105
106
        return [(ts, subj_ambiguity[ts.subject_id])
107
                for uri, ts in subj_tsets.items()]
108
109
110
class MLLMModel:
111
    """Maui-like Lexical Matching model"""
112
113
    def _conflate_matches(self, matches, doc_length):
114
        subj_matches = collections.defaultdict(list)
115
        for match in matches:
116
            subj_matches[match.subject_id].append(match)
117
        return [
118
            Candidate(
119
                doc_length=doc_length,
120
                subject_id=subject_id,
121
                freq=len(matches) / doc_length,
122
                is_pref=mean((float(m.is_pref) for m in matches)),
123
                n_tokens=mean((m.n_tokens for m in matches)),
124
                ambiguity=mean((m.ambiguity for m in matches)),
125
                first_occ=matches[0].pos / doc_length,
126
                last_occ=matches[-1].pos / doc_length,
127
                spread=(matches[-1].pos - matches[0].pos) / doc_length
128
            )
129
            for subject_id, matches in subj_matches.items()]
130
131
    def _generate_candidates(self, text, analyzer):
132
        sentences = analyzer.tokenize_sentences(text)
133
        sent_tokens = self._vectorizer.transform(sentences)
134
        matches = []
135
136
        for sent_idx, token_matrix in enumerate(sent_tokens):
137
            tset = TokenSet(token_matrix.nonzero()[1])
138
            for ts, ambiguity in self._index.search(tset):
139
                matches.append(Match(subject_id=ts.subject_id,
140
                                     is_pref=ts.is_pref,
141
                                     n_tokens=len(ts),
142
                                     pos=sent_idx,
143
                                     ambiguity=ambiguity))
144
145
        return self._conflate_matches(matches, len(sentences))
146
147
    def _candidates_to_features(self, candidates):
148
        """Convert a list of Candidates to a NumPy feature matrix"""
149
        matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
150
        for idx, c in enumerate(candidates):
151
            subj = c.subject_id
152
            matrix[idx, Feature.freq] = c.freq
153
            matrix[idx, Feature.doc_freq] = self._doc_freq[subj]
154
            matrix[idx, Feature.subj_freq] = self._subj_freq.get(subj, 1) - 1
155
            matrix[idx, Feature.tfidf] = c.freq * self._idf[subj]
156
            matrix[idx, Feature.is_pref] = c.is_pref
157
            matrix[idx, Feature.n_tokens] = c.n_tokens
158
            matrix[idx, Feature.ambiguity] = c.ambiguity
159
            matrix[idx, Feature.first_occ] = c.first_occ
160
            matrix[idx, Feature.last_occ] = c.last_occ
161
            matrix[idx, Feature.spread] = c.spread
162
            matrix[idx, Feature.doc_length] = c.doc_length
163
        return matrix
164
165
    def prepare(self, corpus, vocab, analyzer, params):
166
        graph = vocab.as_graph()
167
        terms = []
168
        subject_ids = []
169
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
170
            if pref is None:
171
                continue  # deprecated subject
172
            subject_ids.append(subj_id)
173
            terms.append(Term(subject_id=subj_id, label=pref, is_pref=True))
174
            alts = graph.preferredLabel(URIRef(uri),
175
                                        lang=params['language'],
176
                                        labelProperties=[SKOS.altLabel])
177
            for label, _ in alts:
178
                terms.append(Term(subject_id=subj_id,
179
                                  label=str(label),
180
                                  is_pref=False))
181
182
        self._vectorizer = CountVectorizer(
183
            binary=True,
184
            tokenizer=analyzer.tokenize_words
185
        )
186
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
187
188
        self._index = TokenSetIndex()
189
        for term, label_matrix in zip(terms, label_corpus):
190
            tokens = label_matrix.nonzero()[1]
191
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
192
            self._index.add(tset)
193
194
        # frequency of subjects (by id) in the generated candidates
195
        self._doc_freq = collections.Counter()
196
        # frequency of manually assigned subjects ("domain keyphraseness")
197
        self._subj_freq = collections.Counter()
198
        doc_count = 0
199
        train_X = []
200
        train_y = []
201
        for idx, doc in enumerate(corpus.documents):
202
            doc_subject_ids = [vocab.subjects.by_uri(uri)
203
                               for uri in doc.uris]
204
            self._subj_freq.update(doc_subject_ids)
205
            candidates = self._generate_candidates(doc.text, analyzer)
206
            self._doc_freq.update([c.subject_id for c in candidates])
207
            train_X += candidates
208
            train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
209
            doc_count += 1
210
211
        # precalculate idf values for candidate subjects
212
        self._idf = collections.defaultdict(float)
213
        for subj_id in subject_ids:
214
            self._idf[uri] = math.log((doc_count + 1) /
215
                                      (self._doc_freq[subj_id] + 1)) + 1
216
        return (train_X, train_y)
217
218
    def train(self, train_X, train_y, params):
219
        # define a sklearn pipeline with transformer and classifier
220
        self._model = Pipeline(
221
            steps=[
222
                ('transformer', FunctionTransformer(
223
                    self._candidates_to_features)),
224
                ('classifier', BaggingClassifier(
225
                    DecisionTreeClassifier(
226
                        min_samples_leaf=int(params['min_samples_leaf']),
227
                        max_leaf_nodes=int(params['max_leaf_nodes'])
228
                    ), max_samples=float(params['max_samples'])))])
229
        # fit the model on the training corpus
230
        self._model.fit(train_X, train_y)
231
232
    def predict(self, text, analyzer):
233
        candidates = self._generate_candidates(text, analyzer)
234
        if not candidates:
235
            return []
236
        scores = self._model.predict_proba(candidates)
237
        subj_scores = [(score[1], c.subject_id)
238
                       for score, c in zip(scores, candidates)]
239
        return sorted(subj_scores, reverse=True)
240
241
242
class MLLMBackend(backend.AnnifBackend):
243
    """Maui-like Lexical Matching backend for Annif"""
244
    name = "mllm"
245
    needs_subject_index = True
246
247
    # defaults for unitialized instances
248
    _model = None
249
250
    MODEL_FILE = 'mllm-model.gz'
251
    TRAIN_FILE = 'mllm-train.gz'
252
253
    DEFAULT_PARAMETERS = {
254
        'min_samples_leaf': 20,
255
        'max_leaf_nodes': 1000,
256
        'max_samples': 0.9
257
    }
258
259
    def default_params(self):
260
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
261
        params.update(self.DEFAULT_PARAMETERS)
262
        return params
263
264
    def _load_model(self):
265
        path = os.path.join(self.datadir, self.MODEL_FILE)
266
        self.debug('loading model from {}'.format(path))
267
        if os.path.exists(path):
268
            return joblib.load(path)
269
        else:
270
            raise NotInitializedException(
271
                'model {} not found'.format(path),
272
                backend_id=self.backend_id)
273
274
    def initialize(self):
275
        if self._model is None:
276
            self._model = self._load_model()
277
278
    def _train(self, corpus, params):
279
        self.info('starting train')
280
        if corpus != 'cached':
281
            self.info("preparing training data")
282
            self._model = MLLMModel()
283
            train_data = self._model.prepare(corpus,
284
                                             self.project.vocab,
285
                                             self.project.analyzer,
286
                                             params)
287
            annif.util.atomic_save(train_data,
288
                                   self.datadir,
289
                                   self.TRAIN_FILE,
290
                                   method=joblib.dump)
291
        else:
292
            self.info("reusing cached training data from previous run")
293
            self._model = self._load_model()
294
            path = os.path.join(self.datadir, self.TRAIN_FILE)
295
            if os.path.exists(path):
296
                train_data = joblib.load(path)
297
            else:
298
                raise NotInitializedException(
299
                    'train data file {} not found'.format(path),
300
                    backend_id=self.backend_id)
301
302
        self.info("training model")
303
        self._model.train(train_data[0], train_data[1], params)
304
305
        self.info('saving model')
306
        annif.util.atomic_save(
307
            self._model,
308
            self.datadir,
309
            self.MODEL_FILE,
310
            method=joblib.dump)
311
312
    def _suggest(self, text, params):
313
        vector = np.zeros(len(self.project.subjects), dtype=np.float32)
314
        for score, subject_id in self._model.predict(text,
315
                                                     self.project.analyzer):
316
            vector[subject_id] = score
317
        result = VectorSuggestionResult(vector)
318
        return result.filter(self.project.subjects,
319
                             limit=int(params['limit']))
320