annif.lexical.mllm.MLLMModel._get_label_props()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
"""MLLM (Maui-like Lexical Matchin) model for Annif"""
2
3
from __future__ import annotations
4
5
import collections
6
import math
7
from enum import IntEnum
8
from statistics import mean
9
from typing import TYPE_CHECKING, Any
10
11
import joblib
12
import numpy as np
13
from rdflib.namespace import SKOS
14
from sklearn.ensemble import BaggingClassifier
15
from sklearn.feature_extraction.text import CountVectorizer
16
from sklearn.tree import DecisionTreeClassifier
17
18
import annif.parallel
19
import annif.util
20
from annif.exception import OperationFailedException
21
from annif.lexical.tokenset import TokenSet, TokenSetIndex
22
from annif.lexical.util import (
23
    get_subject_labels,
24
    make_collection_matrix,
25
    make_relation_matrix,
26
)
27
28
if TYPE_CHECKING:
29
    from collections import defaultdict
30
31
    from rdflib.graph import Graph
32
    from rdflib.term import URIRef
33
34
    from annif.analyzer import Analyzer
35
    from annif.corpus.document import DocumentCorpus
36
    from annif.vocab import AnnifVocabulary
37
38
Term = collections.namedtuple("Term", "subject_id label is_pref")
39
40
Match = collections.namedtuple("Match", "subject_id is_pref n_tokens pos ambiguity")
41
42
Candidate = collections.namedtuple(
43
    "Candidate",
44
    "doc_length subject_id freq is_pref n_tokens ambiguity "
45
    + "first_occ last_occ spread",
46
)
47
48
ModelData = collections.namedtuple(
49
    "ModelData", "broader narrower related collection " + "doc_freq subj_freq idf"
50
)
51
52
Feature = IntEnum(
53
    "Feature",
54
    "freq doc_freq subj_freq tfidf is_pref n_tokens ambiguity "
55
    + "first_occ last_occ spread doc_length "
56
    + "broader narrower related collection",
57
    start=0,
58
)
59
60
61
def conflate_matches(matches: list[Match], doc_length: int) -> list[Candidate]:
62
    subj_matches = collections.defaultdict(list)
63
    for match in matches:
64
        subj_matches[match.subject_id].append(match)
65
    return [
66
        Candidate(
67
            doc_length=doc_length,
68
            subject_id=subject_id,
69
            freq=len(matches) / doc_length,
70
            is_pref=mean((float(m.is_pref) for m in matches)),
71
            n_tokens=mean((m.n_tokens for m in matches)),
72
            ambiguity=mean((m.ambiguity for m in matches)),
73
            first_occ=matches[0].pos / doc_length,
74
            last_occ=matches[-1].pos / doc_length,
75
            spread=(matches[-1].pos - matches[0].pos) / doc_length,
76
        )
77
        for subject_id, matches in subj_matches.items()
78
    ]
79
80
81
def generate_candidates(
82
    text: str,
83
    analyzer: Analyzer,
84
    vectorizer: CountVectorizer,
85
    index: TokenSetIndex,
86
) -> list[Candidate]:
87
    sentences = analyzer.tokenize_sentences(text)
88
    sent_tokens = vectorizer.transform(sentences)
89
    matches = []
90
91
    for sent_idx, token_matrix in enumerate(sent_tokens):
92
        tset = TokenSet(token_matrix.nonzero()[1])
93
        for ts, ambiguity in index.search(tset):
94
            matches.append(
95
                Match(
96
                    subject_id=ts.subject_id,
97
                    is_pref=ts.is_pref,
98
                    n_tokens=len(ts),
99
                    pos=sent_idx,
100
                    ambiguity=ambiguity,
101
                )
102
            )
103
104
    return conflate_matches(matches, len(sentences))
105
106
107
def candidates_to_features(
108
    candidates: list[Candidate], mdata: "ModelData"
109
) -> np.ndarray:
110
    """Convert a list of Candidates to a NumPy feature matrix"""
111
112
    matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32)
113
    c_ids = [c.subject_id for c in candidates]
114
    c_vec = np.zeros(mdata.related.shape[0], dtype=bool)
115
    c_vec[c_ids] = True
116
    broader = mdata.broader.multiply(c_vec).sum(axis=1)
117
    narrower = mdata.narrower.multiply(c_vec).sum(axis=1)
118
    related = mdata.related.multiply(c_vec).sum(axis=1)
119
    collection = mdata.collection.multiply(c_vec).T.dot(mdata.collection).sum(axis=0)
120
    for idx, c in enumerate(candidates):
121
        subj = c.subject_id
122
        matrix[idx, Feature.freq] = c.freq
123
        matrix[idx, Feature.doc_freq] = mdata.doc_freq[subj]
124
        matrix[idx, Feature.subj_freq] = mdata.subj_freq.get(subj, 1) - 1
125
        matrix[idx, Feature.tfidf] = c.freq * mdata.idf[subj]
126
        matrix[idx, Feature.is_pref] = c.is_pref
127
        matrix[idx, Feature.n_tokens] = c.n_tokens
128
        matrix[idx, Feature.ambiguity] = c.ambiguity
129
        matrix[idx, Feature.first_occ] = c.first_occ
130
        matrix[idx, Feature.last_occ] = c.last_occ
131
        matrix[idx, Feature.spread] = c.spread
132
        matrix[idx, Feature.doc_length] = c.doc_length
133
        matrix[idx, Feature.broader] = broader[subj, 0] / len(c_ids)
134
        matrix[idx, Feature.narrower] = narrower[subj, 0] / len(c_ids)
135
        matrix[idx, Feature.related] = related[subj, 0] / len(c_ids)
136
        matrix[idx, Feature.collection] = collection[0, subj] / len(c_ids)
137
    return matrix
138
139
140
def create_classifier(params: dict[str, Any]) -> BaggingClassifier:
141
    return BaggingClassifier(
142
        DecisionTreeClassifier(
143
            min_samples_leaf=int(params["min_samples_leaf"]),
144
            max_leaf_nodes=int(params["max_leaf_nodes"]),
145
        ),
146
        max_samples=float(params["max_samples"]),
147
    )
148
149
150
def prediction_to_list(
151
    scores: np.ndarray, candidates: list[Candidate]
152
) -> list[tuple[np.float64, int]]:
153
    subj_scores = [(score[1], c.subject_id) for score, c in zip(scores, candidates)]
154
    return sorted(subj_scores, reverse=True)
155
156
157
class MLLMCandidateGenerator(annif.parallel.BaseWorker):
158
    @classmethod
159
    def generate_candidates(cls, doc_subject_set, text):
160
        candidates = generate_candidates(text, **cls.args)  # pragma: no cover
161
        return doc_subject_set, candidates  # pragma: no cover
162
163
164
class MLLMFeatureConverter(annif.parallel.BaseWorker):
165
    @classmethod
166
    def candidates_to_features(cls, candidates):
167
        return candidates_to_features(candidates, **cls.args)  # pragma: no cover
168
169
170
class MLLMModel:
171
    """Maui-like Lexical Matching model"""
172
173
    def generate_candidates(self, text: str, analyzer: Analyzer) -> list[Candidate]:
174
        return generate_candidates(text, analyzer, self._vectorizer, self._index)
175
176
    @property
177
    def _model_data(self) -> ModelData:
178
        return ModelData(
179
            broader=self._broader_matrix,
180
            narrower=self._narrower_matrix,
181
            related=self._related_matrix,
182
            collection=self._collection_matrix,
183
            doc_freq=self._doc_freq,
184
            subj_freq=self._subj_freq,
185
            idf=self._idf,
186
        )
187
188
    def _candidates_to_features(self, candidates: list[Candidate]) -> np.ndarray:
189
        return candidates_to_features(candidates, self._model_data)
190
191
    @staticmethod
192
    def _get_label_props(params: dict[str, Any]) -> tuple[list[URIRef], list[URIRef]]:
193
        pref_label_props = [SKOS.prefLabel]
194
195
        if annif.util.boolean(params["use_hidden_labels"]):
196
            nonpref_label_props = [SKOS.altLabel, SKOS.hiddenLabel]
197
        else:
198
            nonpref_label_props = [SKOS.altLabel]
199
200
        return (pref_label_props, nonpref_label_props)
201
202
    def _prepare_terms(
203
        self,
204
        graph: Graph,
205
        vocab: AnnifVocabulary,
206
        params: dict[str, Any],
207
    ) -> tuple[list[Term], list[int]]:
208
        pref_label_props, nonpref_label_props = self._get_label_props(params)
209
210
        terms = []
211
        subject_ids = []
212
        for subj_id, subject in vocab.subjects.active:
213
            subject_ids.append(subj_id)
214
215
            for label in get_subject_labels(
216
                graph, subject.uri, pref_label_props, params["language"]
217
            ):
218
                terms.append(Term(subject_id=subj_id, label=label, is_pref=True))
219
220
            for label in get_subject_labels(
221
                graph, subject.uri, nonpref_label_props, params["language"]
222
            ):
223
                terms.append(Term(subject_id=subj_id, label=label, is_pref=False))
224
225
        return (terms, subject_ids)
226
227
    def _prepare_relations(self, graph: Graph, vocab: AnnifVocabulary) -> None:
228
        self._broader_matrix = make_relation_matrix(graph, vocab, SKOS.broader)
229
        self._narrower_matrix = make_relation_matrix(graph, vocab, SKOS.narrower)
230
        self._related_matrix = make_relation_matrix(graph, vocab, SKOS.related)
231
        self._collection_matrix = make_collection_matrix(graph, vocab)
232
233
    def _prepare_train_index(
234
        self,
235
        vocab: AnnifVocabulary,
236
        analyzer: Analyzer,
237
        params: dict[str, Any],
238
    ) -> list[int]:
239
        graph = vocab.as_graph()
240
        terms, subject_ids = self._prepare_terms(graph, vocab, params)
241
        self._prepare_relations(graph, vocab)
242
243
        self._vectorizer = CountVectorizer(
244
            binary=True, tokenizer=analyzer.tokenize_words, token_pattern=None
245
        )
246
        label_corpus = self._vectorizer.fit_transform((t.label for t in terms))
247
248
        # frequency of each token used in labels - how rare each word is
249
        token_freq = np.bincount(label_corpus.indices, minlength=label_corpus.shape[1])
250
251
        self._index = TokenSetIndex()
252
        for term, label_matrix in zip(terms, label_corpus):
253
            tokens = label_matrix.nonzero()[1]
254
            # sort tokens by frequency - use the rarest token as index key
255
            tokens = sorted(tokens, key=token_freq.__getitem__)
256
            tset = TokenSet(tokens, term.subject_id, term.is_pref)
257
            self._index.add(tset)
258
259
        return subject_ids
260
261
    def _prepare_train_data(
262
        self, corpus: DocumentCorpus, analyzer: Analyzer, n_jobs: int
263
    ) -> tuple[list[list[Candidate]], list[bool]]:
264
        # frequency of subjects (by id) in the generated candidates
265
        self._doc_freq = collections.Counter()
266
        # frequency of manually assigned subjects ("domain keyphraseness")
267
        self._subj_freq = collections.Counter()
268
        train_x = []
269
        train_y = []
270
271
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
272
273
        cg_args = {
274
            "analyzer": analyzer,
275
            "vectorizer": self._vectorizer,
276
            "index": self._index,
277
        }
278
279
        with pool_class(
280
            jobs, initializer=MLLMCandidateGenerator.init, initargs=(cg_args,)
281
        ) as pool:
282
            params = ((doc.subject_set, doc.text) for doc in corpus.documents)
283
            for doc_subject_ids, candidates in pool.starmap(
284
                MLLMCandidateGenerator.generate_candidates, params, 10
285
            ):
286
                self._subj_freq.update(doc_subject_ids)
287
                self._doc_freq.update([c.subject_id for c in candidates])
288
                train_x.append(candidates)
289
                train_y += [(c.subject_id in doc_subject_ids) for c in candidates]
290
291
        return (train_x, train_y)
292
293
    def _calculate_idf(
294
        self, subject_ids: list[int], doc_count: int
295
    ) -> defaultdict[int, float]:
296
        idf = collections.defaultdict(float)
297
        for subj_id in subject_ids:
298
            idf[subj_id] = math.log((doc_count + 1) / (self._doc_freq[subj_id] + 1)) + 1
299
300
        return idf
301
302
    def _prepare_features(
303
        self, train_x: list[list[Candidate]], n_jobs: int
304
    ) -> list[np.ndarray]:
305
        fc_args = {"mdata": self._model_data}
306
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
307
308
        with pool_class(
309
            jobs, initializer=MLLMFeatureConverter.init, initargs=(fc_args,)
310
        ) as pool:
311
            features = pool.map(
312
                MLLMFeatureConverter.candidates_to_features, train_x, 10
313
            )
314
315
        return features
316
317
    def prepare_train(
318
        self,
319
        corpus: DocumentCorpus,
320
        vocab: AnnifVocabulary,
321
        analyzer: Analyzer,
322
        params: dict[str, Any],
323
        n_jobs: int,
324
    ) -> tuple[np.ndarray, np.ndarray]:
325
        # create an index from the vocabulary terms
326
        subject_ids = self._prepare_train_index(vocab, analyzer, params)
327
328
        # convert the corpus into train data
329
        train_x, train_y = self._prepare_train_data(corpus, analyzer, n_jobs)
330
331
        # precalculate idf values for all candidate subjects
332
        self._idf = self._calculate_idf(subject_ids, len(train_x))
333
334
        # convert the train data into feature values
335
        features = self._prepare_features(train_x, n_jobs)
336
337
        return (np.vstack(features), np.array(train_y))
338
339
    def train(
340
        self,
341
        train_x: np.ndarray | list[tuple[int, int]],
342
        train_y: list[bool] | np.ndarray,
343
        params: dict[str, Any],
344
    ) -> None:
345
        # fit the model on the training corpus
346
        self._classifier = create_classifier(params)
347
        self._classifier.fit(train_x, train_y)
348
        # sanity check: verify that the classifier has seen both classes
349
        if self._classifier.n_classes_ != 2:
350
            raise OperationFailedException(
351
                "Unable to create classifier: "
352
                + "Not enough positive and negative examples "
353
                + "in the training data. Please check that your training "
354
                + "data matches your vocabulary."
355
            )
356
357
    def predict(self, candidates: list[Candidate]) -> list[tuple[np.float64, int]]:
358
        if not candidates:
359
            return []
360
        features = self._candidates_to_features(candidates)
361
        scores = self._classifier.predict_proba(features)
362
        return prediction_to_list(scores, candidates)
363
364
    def save(self, filename: str) -> list[str]:
365
        return joblib.dump(self, filename)
366
367
    @staticmethod
368
    def load(filename: str) -> MLLMModel:
369
        return joblib.load(filename)
370