annif.lexical.mllm.MLLMModel.save()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""MLLM (Maui-like Lexical Matchin) model for Annif"""
2
3
from __future__ import annotations
4
5
import collections
6
import math
7
from enum import IntEnum
8
from statistics import mean
9
from typing import TYPE_CHECKING, Any
10
11
import joblib
12
import numpy as np
13
from rdflib.namespace import SKOS
14
from sklearn.ensemble import BaggingClassifier
15
from sklearn.feature_extraction.text import CountVectorizer
16
from sklearn.tree import DecisionTreeClassifier
17
18
import annif.parallel
19
import annif.util
20
from annif.exception import OperationFailedException
21
from annif.lexical.tokenset import TokenSet, TokenSetIndex
22
from annif.lexical.util import (
23
    get_subject_labels,
24
    make_collection_matrix,
25
    make_relation_matrix,
26
)
27
28
if TYPE_CHECKING:
29
    from collections import defaultdict
30
31
    from rdflib.graph import Graph
32
    from rdflib.term import URIRef
33
34
    from annif.analyzer import Analyzer
35
    from annif.corpus.document import DocumentCorpus
36
    from annif.vocab import AnnifVocabulary
37
38
Term = collections.namedtuple("Term", "subject_id label is_pref")
39
40
Match = collections.namedtuple("Match", "subject_id is_pref n_tokens pos ambiguity")
41
42
Candidate = collections.namedtuple(
43
    "Candidate",
44
    "doc_length subject_id freq is_pref n_tokens ambiguity "
45
    + "first_occ last_occ spread",
46
)
47
48
ModelData = collections.namedtuple(
49
    "ModelData", "broader narrower related collection " + "doc_freq subj_freq idf"
50
)
51
52
Feature = IntEnum(
53
    "Feature",
54
    "freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity "
55
    + "first_occ last_occ spread doc_length "
56
    + "broader narrower related collection",
57
    start=0,
58
)
59
60
61
def conflate_matches(matches: list[Match], doc_length: int) -> list[Candidate]:
62
    subj_matches = collections.defaultdict(list)
63
    for match in matches:
64
        subj_matches[match.subject_id].append(match)
65
    return [
66
        Candidate(
67
            doc_length=doc_length,
68
            subject_id=subject_id,
69
            freq=len(matches) / doc_length,
70
            is_pref=mean((float(m.is_pref) for m in matches)),
71
            n_tokens=mean((m.n_tokens for m in matches)),
72
            ambiguity=mean((m.ambiguity for m in matches)),
73
            first_occ=matches[0].pos / doc_length,
74
            last_occ=matches[-1].pos / doc_length,
75
            spread=(matches[-1].pos - matches[0].pos) / doc_length,
76
        )
77
        for subject_id, matches in subj_matches.items()
78
    ]
79
80
81
def generate_candidates(
82
    text: str,
83
    analyzer: Analyzer,
84
    vectorizer: CountVectorizer,
85
    index: TokenSetIndex,
86
) -> list[Candidate]:
87
    sentences = analyzer.tokenize_sentences(text)
88
    sent_tokens = vectorizer.transform(sentences)
89
    matches = []
90
91
    for sent_idx, token_matrix in enumerate(sent_tokens):
92
        tset = TokenSet(token_matrix.nonzero()[1])
93
        for ts, ambiguity in index.search(tset):
94
            matches.append(
95
                Match(
96
                    subject_id=ts.subject_id,
97
                    is_pref=ts.is_pref,
98
                    n_tokens=len(ts),
99
                    pos=sent_idx,
100
                    ambiguity=ambiguity,
101
                )
102
            )
103
104
    return conflate_matches(matches, len(sentences))
105
106
107
def candidates_to_features(
108
    candidates: list[Candidate], mdata: "ModelData"
109
) -> np.ndarray:
110
    """Convert a list of Candidates to a NumPy feature matrix"""
111
112
    matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
113
    c_ids = [c.subject_id for c in candidates]
114
    c_vec = np.zeros(mdata.related.shape[0], dtype=bool)
115
    c_vec[c_ids] = True
116
    broader = mdata.broader.multiply(c_vec).sum(axis=1)
117
    narrower = mdata.narrower.multiply(c_vec).sum(axis=1)
118
    related = mdata.related.multiply(c_vec).sum(axis=1)
119
    collection = mdata.collection.multiply(c_vec).T.dot(mdata.collection).sum(axis=0)
120
    for idx, c in enumerate(candidates):
121
        subj = c.subject_id
122
        matrix[idx, Feature.freq] = c.freq
123
        matrix[idx, Feature.doc_freq] = mdata.doc_freq[subj]
124
        matrix[idx, Feature.subj_freq] = mdata.subj_freq.get(subj, 1) - 1
125
        matrix[idx, Feature.tfidf] = c.freq * mdata.idf[subj]
126
        matrix[idx, Feature.is_pref] = c.is_pref
127
        matrix[idx, Feature.n_tokens] = c.n_tokens
128
        matrix[idx, Feature.ambiguity] = c.ambiguity
129
        matrix[idx, Feature.first_occ] = c.first_occ
130
        matrix[idx, Feature.last_occ] = c.last_occ
131
        matrix[idx, Feature.spread] = c.spread
132
        matrix[idx, Feature.doc_length] = c.doc_length
133
        matrix[idx, Feature.broader] = broader[subj, 0] / len(c_ids)
134
        matrix[idx, Feature.narrower] = narrower[subj, 0] / len(c_ids)
135
        matrix[idx, Feature.related] = related[subj, 0] / len(c_ids)
136
        matrix[idx, Feature.collection] = collection[0, subj] / len(c_ids)
137
    return matrix
138
139
140
class MLLMCandidateGenerator(annif.parallel.BaseWorker):
141
    @classmethod
142
    def generate_candidates(cls, doc_subject_set, text):
143
        candidates = generate_candidates(text, **cls.args)  # pragma: no cover
144
        return doc_subject_set, candidates  # pragma: no cover
145
146
147
class MLLMFeatureConverter(annif.parallel.BaseWorker):
148
    @classmethod
149
    def candidates_to_features(cls, candidates):
150
        return candidates_to_features(candidates, **cls.args)  # pragma: no cover
151
152
153
class MLLMModel:
154
    """Maui-like Lexical Matching model"""
155
156
    def generate_candidates(self, text: str, analyzer: Analyzer) -> list[Candidate]:
157
        return generate_candidates(text, analyzer, self._vectorizer, self._index)
158
159
    @property
160
    def _model_data(self) -> ModelData:
161
        return ModelData(
162
            broader=self._broader_matrix,
163
            narrower=self._narrower_matrix,
164
            related=self._related_matrix,
165
            collection=self._collection_matrix,
166
            doc_freq=self._doc_freq,
167
            subj_freq=self._subj_freq,
168
            idf=self._idf,
169
        )
170
171
    def _candidates_to_features(self, candidates: list[Candidate]) -> np.ndarray:
172
        return candidates_to_features(candidates, self._model_data)
173
174
    @staticmethod
175
    def _get_label_props(params: dict[str, Any]) -> tuple[list[URIRef], list[URIRef]]:
176
        pref_label_props = [SKOS.prefLabel]
177
178
        if annif.util.boolean(params["use_hidden_labels"]):
179
            nonpref_label_props = [SKOS.altLabel, SKOS.hiddenLabel]
180
        else:
181
            nonpref_label_props = [SKOS.altLabel]
182
183
        return (pref_label_props, nonpref_label_props)
184
185
    def _prepare_terms(
186
        self,
187
        graph: Graph,
188
        vocab: AnnifVocabulary,
189
        params: dict[str, Any],
190
    ) -> tuple[list[Term], list[int]]:
191
        pref_label_props, nonpref_label_props = self._get_label_props(params)
192
193
        terms = []
194
        subject_ids = []
195
        for subj_id, subject in vocab.subjects.active:
196
            subject_ids.append(subj_id)
197
198
            for label in get_subject_labels(
199
                graph, subject.uri, pref_label_props, params["language"]
200
            ):
201
                terms.append(Term(subject_id=subj_id, label=label, is_pref=True))
202
203
            for label in get_subject_labels(
204
                graph, subject.uri, nonpref_label_props, params["language"]
205
            ):
206
                terms.append(Term(subject_id=subj_id, label=label, is_pref=False))
207
208
        return (terms, subject_ids)
209
210
    def _prepare_relations(self, graph: Graph, vocab: AnnifVocabulary) -> None:
211
        self._broader_matrix = make_relation_matrix(graph, vocab, SKOS.broader)
212
        self._narrower_matrix = make_relation_matrix(graph, vocab, SKOS.narrower)
213
        self._related_matrix = make_relation_matrix(graph, vocab, SKOS.related)
214
        self._collection_matrix = make_collection_matrix(graph, vocab)
215
216
    def _prepare_train_index(
217
        self,
218
        vocab: AnnifVocabulary,
219
        analyzer: Analyzer,
220
        params: dict[str, Any],
221
    ) -> list[int]:
222
        graph = vocab.as_graph()
223
        terms, subject_ids = self._prepare_terms(graph, vocab, params)
224
        self._prepare_relations(graph, vocab)
225
226
        self._vectorizer = CountVectorizer(
227
            binary=True, tokenizer=analyzer.tokenize_words, token_pattern=None
228
        )
229
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
230
231
        # frequency of each token used in labels - how rare each word is
232
        token_freq = np.bincount(label_corpus.indices, minlength=label_corpus.shape[1])
233
234
        self._index = TokenSetIndex()
235
        for term, label_matrix in zip(terms, label_corpus):
236
            tokens = label_matrix.nonzero()[1]
237
            # sort tokens by frequency - use the rarest token as index key
238
            tokens = sorted(tokens, key=token_freq.__getitem__)
239
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
240
            self._index.add(tset)
241
242
        return subject_ids
243
244
    def _prepare_train_data(
245
        self, corpus: DocumentCorpus, analyzer: Analyzer, n_jobs: int
246
    ) -> tuple[list[list[Candidate]], list[bool]]:
247
        # frequency of subjects (by id) in the generated candidates
248
        self._doc_freq = collections.Counter()
249
        # frequency of manually assigned subjects ("domain keyphraseness")
250
        self._subj_freq = collections.Counter()
251
        train_x = []
252
        train_y = []
253
254
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
255
256
        cg_args = {
257
            "analyzer": analyzer,
258
            "vectorizer": self._vectorizer,
259
            "index": self._index,
260
        }
261
262
        with pool_class(
263
            jobs, initializer=MLLMCandidateGenerator.init, initargs=(cg_args,)
264
        ) as pool:
265
            params = ((doc.subject_set, doc.text) for doc in corpus.documents)
266
            for doc_subject_ids, candidates in pool.starmap(
267
                MLLMCandidateGenerator.generate_candidates, params, 10
268
            ):
269
                self._subj_freq.update(doc_subject_ids)
270
                self._doc_freq.update([c.subject_id for c in candidates])
271
                train_x.append(candidates)
272
                train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
273
274
        return (train_x, train_y)
275
276
    def _calculate_idf(
277
        self, subject_ids: list[int], doc_count: int
278
    ) -> defaultdict[int, float]:
279
        idf = collections.defaultdict(float)
280
        for subj_id in subject_ids:
281
            idf[subj_id] = math.log((doc_count + 1) / (self._doc_freq[subj_id] + 1)) + 1
282
283
        return idf
284
285
    def _prepare_features(
286
        self, train_x: list[list[Candidate]], n_jobs: int
287
    ) -> list[np.ndarray]:
288
        fc_args = {"mdata": self._model_data}
289
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
290
291
        with pool_class(
292
            jobs, initializer=MLLMFeatureConverter.init, initargs=(fc_args,)
293
        ) as pool:
294
            features = pool.map(
295
                MLLMFeatureConverter.candidates_to_features, train_x, 10
296
            )
297
298
        return features
299
300
    def prepare_train(
301
        self,
302
        corpus: DocumentCorpus,
303
        vocab: AnnifVocabulary,
304
        analyzer: Analyzer,
305
        params: dict[str, Any],
306
        n_jobs: int,
307
    ) -> tuple[np.ndarray, np.ndarray]:
308
        # create an index from the vocabulary terms
309
        subject_ids = self._prepare_train_index(vocab, analyzer, params)
310
311
        # convert the corpus into train data
312
        train_x, train_y = self._prepare_train_data(corpus, analyzer, n_jobs)
313
314
        # precalculate idf values for all candidate subjects
315
        self._idf = self._calculate_idf(subject_ids, len(train_x))
316
317
        # convert the train data into feature values
318
        features = self._prepare_features(train_x, n_jobs)
319
320
        return (np.vstack(features), np.array(train_y))
321
322
    def _create_classifier(self, params: dict[str, Any]) -> BaggingClassifier:
323
        return BaggingClassifier(
324
            DecisionTreeClassifier(
325
                min_samples_leaf=int(params["min_samples_leaf"]),
326
                max_leaf_nodes=int(params["max_leaf_nodes"]),
327
            ),
328
            max_samples=float(params["max_samples"]),
329
        )
330
331
    def train(
332
        self,
333
        train_x: np.ndarray | list[tuple[int, int]],
334
        train_y: list[bool] | np.ndarray,
335
        params: dict[str, Any],
336
    ) -> None:
337
        # fit the model on the training corpus
338
        self._classifier = self._create_classifier(params)
339
        self._classifier.fit(train_x, train_y)
340
        # sanity check: verify that the classifier has seen both classes
341
        if self._classifier.n_classes_ != 2:
342
            raise OperationFailedException(
343
                "Unable to create classifier: "
344
                + "Not enough positive and negative examples "
345
                + "in the training data. Please check that your training "
346
                + "data matches your vocabulary."
347
            )
348
349
    def _prediction_to_list(
350
        self, scores: np.ndarray, candidates: list[Candidate]
351
    ) -> list[tuple[np.float64, int]]:
352
        subj_scores = [(score[1], c.subject_id) for score, c in zip(scores, candidates)]
353
        return sorted(subj_scores, reverse=True)
354
355
    def predict(self, candidates: list[Candidate]) -> list[tuple[np.float64, int]]:
356
        if not candidates:
357
            return []
358
        features = self._candidates_to_features(candidates)
359
        scores = self._classifier.predict_proba(features)
360
        return self._prediction_to_list(scores, candidates)
361
362
    def save(self, filename: str) -> list[str]:
363
        return joblib.dump(self, filename)
364
365
    @staticmethod
366
    def load(filename: str) -> MLLMModel:
367
        return joblib.load(filename)
368