Passed
Pull Request — master (#462)
by Osma
01:56
created

annif.backend.mllm   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 297
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 43
eloc 226
dl 0
loc 297
rs 8.96
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A MLLMModel._candidates_to_features() 0 17 2
C TokenSetIndex.search() 0 26 10
A TokenSet.sample() 0 6 2
A TokenSet.__init__() 0 4 1
A MLLMBackend.initialize() 0 10 3
A MLLMBackend._train() 0 15 1
A TokenSet.__len__() 0 2 1
A TokenSetIndex.add() 0 5 2
B MLLMModel.train() 0 65 7
A MLLMBackend.default_params() 0 4 1
A TokenSet.contains() 0 5 1
A TokenSet.__iter__() 0 2 1
A TokenSetIndex.__len__() 0 2 1
A MLLMModel.predict() 0 8 2
A TokenSetIndex.__init__() 0 2 1
A MLLMModel._conflate_matches() 0 17 2
A MLLMModel._generate_candidates() 0 15 3
A MLLMBackend._suggest() 0 8 2

How to fix   Complexity   

Complexity

Complex classes like annif.backend.mllm often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Maui-like Lexical Matching backend"""
2
3
import collections
4
import math
5
from enum import IntEnum
6
from statistics import mean
7
import os.path
8
import joblib
9
import numpy as np
10
from rdflib import URIRef
11
from rdflib.namespace import SKOS
12
from sklearn.feature_extraction.text import CountVectorizer
13
from sklearn.pipeline import Pipeline
14
from sklearn.preprocessing import FunctionTransformer
15
from sklearn.ensemble import BaggingClassifier
16
from sklearn.tree import DecisionTreeClassifier
17
import annif.util
18
from annif.exception import NotInitializedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
22
Term = collections.namedtuple('Term', 'subject_id label is_pref')
23
Match = collections.namedtuple(
24
    'Match', 'subject_id is_pref n_tokens pos ambiguity')
25
Candidate = collections.namedtuple(
26
    'Candidate',
27
    'doc_length subject_id freq is_pref n_tokens ambiguity ' +
28
    'first_occ last_occ spread')
29
30
Feature = IntEnum(
31
    'Feature',
32
    'freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity ' +
33
    'first_occ last_occ spread doc_length',
34
    start=0)
35
36
37
class TokenSet:
38
    """Represents a set of tokens (expressed as integer token IDs) that can
39
    be matched with another set of tokens. A TokenSet can optionally
40
    be associated with a subject from the vocabulary."""
41
42
    def __init__(self, tokens, subject_id=None, is_pref=False):
43
        self._tokens = set(tokens)
44
        self.subject_id = subject_id
45
        self.is_pref = is_pref
46
47
    def __len__(self):
48
        return len(self._tokens)
49
50
    def __iter__(self):
51
        return iter(self._tokens)
52
53
    def contains(self, other):
54
        """Returns True iff the tokens in the other TokenSet are all
55
        included within this TokenSet."""
56
57
        return other._tokens.issubset(self._tokens)
58
59
    def sample(self):
60
        """Return an arbitrary token from this TokenSet, or None if empty"""
61
        try:
62
            return next(iter(self._tokens))
63
        except StopIteration:
64
            return None
65
66
67
class TokenSetIndex:
68
    """A searchable index of TokenSets (representing vocabulary terms)"""
69
70
    def __init__(self):
71
        self._index = collections.defaultdict(set)
72
73
    def __len__(self):
74
        return len(self._index)
75
76
    def add(self, tset):
77
        """Add a TokenSet into this index"""
78
        token = tset.sample()
79
        if token is not None:
80
            self._index[token].add(tset)
81
82
    def search(self, tset):
83
        """Return the TokenSets that are contained in the given TokenSet.
84
        The matches are returned as a list of (TokenSet, ambiguity) pairs
85
        where ambiguity is an integer indicating the number of other TokenSets
86
        that also match the same tokens."""
87
88
        subj_tsets = {}
89
        subj_ambiguity = collections.Counter()
90
91
        for token in tset:
92
            for ts in self._index[token]:
93
                if not tset.contains(ts):
94
                    continue
95
                if ts.subject_id not in subj_tsets or \
96
                   not subj_tsets[ts.subject_id].is_pref:
97
                    subj_tsets[ts.subject_id] = ts
98
99
        for ts in subj_tsets.values():
100
            for other in subj_tsets.values():
101
                if ts == other:
102
                    continue
103
                if other.contains(ts):
104
                    subj_ambiguity.update([ts.subject_id])
105
106
        return [(ts, subj_ambiguity[ts.subject_id])
107
                for uri, ts in subj_tsets.items()]
108
109
110
class MLLMModel:
111
    """Maui-like Lexical Matching model"""
112
113
    def _conflate_matches(self, matches, doc_length):
114
        subj_matches = collections.defaultdict(list)
115
        for match in matches:
116
            subj_matches[match.subject_id].append(match)
117
        return [
118
            Candidate(
119
                doc_length=doc_length,
120
                subject_id=subject_id,
121
                freq=len(matches) / doc_length,
122
                is_pref=mean((float(m.is_pref) for m in matches)),
123
                n_tokens=mean((m.n_tokens for m in matches)),
124
                ambiguity=mean((m.ambiguity for m in matches)),
125
                first_occ=matches[0].pos / doc_length,
126
                last_occ=matches[-1].pos / doc_length,
127
                spread=(matches[-1].pos - matches[0].pos) / doc_length
128
            )
129
            for subject_id, matches in subj_matches.items()]
130
131
    def _generate_candidates(self, text, analyzer):
132
        sentences = analyzer.tokenize_sentences(text)
133
        sent_tokens = self._vectorizer.transform(sentences)
134
        matches = []
135
136
        for sent_idx, token_matrix in enumerate(sent_tokens):
137
            tset = TokenSet(token_matrix.nonzero()[1])
138
            for ts, ambiguity in self._index.search(tset):
139
                matches.append(Match(subject_id=ts.subject_id,
140
                                     is_pref=ts.is_pref,
141
                                     n_tokens=len(ts),
142
                                     pos=sent_idx,
143
                                     ambiguity=ambiguity))
144
145
        return self._conflate_matches(matches, len(sentences))
146
147
    def _candidates_to_features(self, candidates):
148
        """Convert a list of Candidates to a NumPy feature matrix"""
149
        matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
150
        for idx, c in enumerate(candidates):
151
            subj = c.subject_id
152
            matrix[idx, Feature.freq] = c.freq
153
            matrix[idx, Feature.doc_freq] = self._doc_freq[subj]
154
            matrix[idx, Feature.subj_freq] = self._subj_freq.get(subj, 1) - 1
155
            matrix[idx, Feature.tfidf] = c.freq * self._idf[subj]
156
            matrix[idx, Feature.is_pref] = c.is_pref
157
            matrix[idx, Feature.n_tokens] = c.n_tokens
158
            matrix[idx, Feature.ambiguity] = c.ambiguity
159
            matrix[idx, Feature.first_occ] = c.first_occ
160
            matrix[idx, Feature.last_occ] = c.last_occ
161
            matrix[idx, Feature.spread] = c.spread
162
            matrix[idx, Feature.doc_length] = c.doc_length
163
        return matrix
164
165
    def train(self, corpus, vocab, analyzer, params):
166
        graph = vocab.as_graph()
167
        terms = []
168
        subject_ids = []
169
        for subj_id, (uri, pref, _) in enumerate(vocab.subjects):
170
            if pref is None:
171
                continue  # deprecated subject
172
            subject_ids.append(subj_id)
173
            terms.append(Term(subject_id=subj_id, label=pref, is_pref=True))
174
            alts = graph.preferredLabel(URIRef(uri),
175
                                        lang=params['language'],
176
                                        labelProperties=[SKOS.altLabel])
177
            for label, _ in alts:
178
                terms.append(Term(subject_id=subj_id,
179
                                  label=str(label),
180
                                  is_pref=False))
181
182
        self._vectorizer = CountVectorizer(
183
            binary=True,
184
            tokenizer=analyzer.tokenize_words
185
        )
186
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
187
188
        self._index = TokenSetIndex()
189
        for term, label_matrix in zip(terms, label_corpus):
190
            tokens = label_matrix.nonzero()[1]
191
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
192
            self._index.add(tset)
193
194
        # frequency of subjects (by id) in the generated candidates
195
        self._doc_freq = collections.Counter()
196
        # frequency of manually assigned subjects ("domain keyphraseness")
197
        self._subj_freq = collections.Counter()
198
        doc_count = 0
199
        train_X = []
200
        train_y = []
201
        for idx, doc in enumerate(corpus.documents):
202
            doc_subject_ids = [vocab.subjects.by_uri(uri)
203
                               for uri in doc.uris]
204
            self._subj_freq.update(doc_subject_ids)
205
            candidates = self._generate_candidates(doc.text, analyzer)
206
            self._doc_freq.update([c.subject_id for c in candidates])
207
            train_X += candidates
208
            train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
209
            doc_count += 1
210
211
        # precalculate idf values for candidate subjects
212
        self._idf = collections.defaultdict(float)
213
        for subj_id in subject_ids:
214
            self._idf[uri] = math.log((doc_count + 1) /
215
                                      (self._doc_freq[subj_id] + 1)) + 1
216
217
        # define a sklearn pipeline with transformer and classifier
218
        # TODO: make hyperparameters configurable
219
        self._model = Pipeline(
220
            steps=[
221
                ('transformer', FunctionTransformer(
222
                    self._candidates_to_features)),
223
                ('classifier', BaggingClassifier(
224
                    DecisionTreeClassifier(
225
                        min_samples_leaf=int(params['min_samples_leaf']),
226
                        max_leaf_nodes=int(params['max_leaf_nodes'])
227
                    ), max_samples=int(params['max_samples'])))])
228
        # fit the model on the training corpus
229
        self._model.fit(train_X, train_y)
230
231
    def predict(self, text, analyzer):
232
        candidates = self._generate_candidates(text, analyzer)
233
        if not candidates:
234
            return []
235
        scores = self._model.predict_proba(candidates)
236
        subj_scores = [(score[1], c.subject_id)
237
                       for score, c in zip(scores, candidates)]
238
        return sorted(subj_scores, reverse=True)
239
240
241
class MLLMBackend(backend.AnnifBackend):
242
    """Maui-like Lexical Matching backend for Annif"""
243
    name = "mllm"
244
    needs_subject_index = True
245
246
    # defaults for unitialized instances
247
    _model = None
248
249
    MODEL_FILE = 'model'
250
251
    DEFAULT_PARAMETERS = {
252
        'min_samples_leaf': 20,
253
        'max_leaf_nodes': 1000,
254
        'max_samples': 0.9
255
    }
256
257
    def default_params(self):
258
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
259
        params.update(self.DEFAULT_PARAMETERS)
260
        return params
261
262
    def initialize(self):
263
        if self._model is None:
264
            path = os.path.join(self.datadir, self.MODEL_FILE)
265
            self.debug('loading model from {}'.format(path))
266
            if os.path.exists(path):
267
                self._model = joblib.load(path)
268
            else:
269
                raise NotInitializedException(
270
                    'model {} not found'.format(path),
271
                    backend_id=self.backend_id)
272
273
    def _train(self, corpus, params):
274
        # TODO: check for "cached" corpus
275
        self.info('starting train')
276
        self._model = MLLMModel()
277
        self._model.train(
278
            corpus,
279
            self.project.vocab,
280
            self.project.analyzer,
281
            params)
282
        self.info('saving model')
283
        annif.util.atomic_save(
284
            self._model,
285
            self.datadir,
286
            self.MODEL_FILE,
287
            method=joblib.dump)
288
289
    def _suggest(self, text, params):
290
        vector = np.zeros(len(self.project.subjects), dtype=np.float32)
291
        for score, subject_id in self._model.predict(text,
292
                                                     self.project.analyzer):
293
            vector[subject_id] = score
294
        result = VectorSuggestionResult(vector)
295
        return result.filter(self.project.subjects,
296
                             limit=int(params['limit']))
297