Passed
Push — main ( da1836...1db6a8 )
by Osma
07:26 queued 04:14
created

annif.backend.mllm.MLLMBackend._suggest()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
"""Maui-like Lexical Matching backend"""
2
3
from __future__ import annotations
4
5
import os.path
6
from typing import TYPE_CHECKING, Any
7
8
import joblib
9
import numpy as np
10
11
import annif.eval
12
import annif.util
13
from annif.exception import NotInitializedException, NotSupportedException
14
from annif.lexical.mllm import (
15
    MLLMModel,
16
    candidates_to_features,
17
    create_classifier,
18
    prediction_to_list,
19
)
20
from annif.suggestion import vector_to_suggestions
21
22
from . import hyperopt
23
24
if TYPE_CHECKING:
25
    from collections.abc import Iterator
26
27
    from optuna.study.study import Study
28
    from optuna.trial import Trial
29
30
    from annif.backend.hyperopt import HPRecommendation
31
    from annif.corpus import Document, DocumentCorpus
32
    from annif.lexical.mllm import Candidate
33
    from annif.vocab import SubjectIndex
34
35
36
def prediction_to_result(
37
    prediction: list[tuple[np.float64, int]],
38
    params: dict[str, Any],
39
    subject_index: SubjectIndex,
40
) -> Iterator:
41
    vector = np.zeros(len(subject_index), dtype=np.float32)
42
    for score, subject_id in prediction:
43
        vector[subject_id] = score
44
    return vector_to_suggestions(vector, int(params["limit"]))
45
46
47
class MLLMHPObjective(hyperopt.HPObjective):
48
    """Objective function of the MLLM hyperparameter optimizer"""
49
50
    @classmethod
51
    def objective(cls, trial: Trial, args) -> float:
52
        params = {
53
            "min_samples_leaf": trial.suggest_int("min_samples_leaf", 5, 30),
54
            "max_leaf_nodes": trial.suggest_int("max_leaf_nodes", 100, 2000),
55
            "max_samples": trial.suggest_float("max_samples", 0.5, 1.0),
56
            "limit": 100,
57
        }
58
        model = create_classifier(params)
59
        model.fit(args["train_x"], args["train_y"])
60
61
        batch = annif.eval.EvaluationBatch(args["subject_index"])
62
        for goldsubj, candidates in zip(args["gold_subjects"], args["candidates"]):
63
            if candidates:
64
                features = candidates_to_features(candidates, args["model_data"])
65
                scores = model.predict_proba(features)
66
                ranking = prediction_to_list(scores, candidates)
67
            else:
68
                ranking = []
69
            results = prediction_to_result(ranking, params, args["subject_index"])
70
            batch.evaluate_many([results], [goldsubj])
71
        results = batch.results(metrics=[args["metric"]])
72
        return results[args["metric"]]
73
74
75
class MLLMOptimizer(hyperopt.HyperparameterOptimizer):
76
    """Hyperparameter optimizer for the MLLM backend"""
77
78
    def _prepare(self, n_jobs: int = 1) -> dict[str, Any]:
79
        self._backend.initialize()
80
        train_x, train_y = self._backend._load_train_data()
81
        all_candidates = []
82
        gold_subjects = []
83
84
        # TODO parallelize generation of candidates
85
        for doc in self._corpus.documents:
86
            candidates = self._backend._generate_candidates(doc.text)
87
            all_candidates.append(candidates)
88
            gold_subjects.append(doc.subject_set)
89
90
        return {
91
            "train_x": train_x,
92
            "train_y": train_y,
93
            "subject_index": self._backend.project.subjects,
94
            "gold_subjects": gold_subjects,
95
            "candidates": all_candidates,
96
            "model_data": self._backend._model._model_data,
97
            "metric": self._metric,
98
        }
99
100
    def _postprocess(self, study: Study) -> HPRecommendation:
101
        bp = study.best_params
102
        lines = [
103
            f"min_samples_leaf={bp['min_samples_leaf']}",
104
            f"max_leaf_nodes={bp['max_leaf_nodes']}",
105
            f"max_samples={bp['max_samples']:.4f}",
106
        ]
107
        return hyperopt.HPRecommendation(lines=lines, score=study.best_value)
108
109
110
class MLLMBackend(hyperopt.AnnifHyperoptBackend):
111
    """Maui-like Lexical Matching backend for Annif"""
112
113
    name = "mllm"
114
115
    # defaults for unitialized instances
116
    _model = None
117
118
    MODEL_FILE = "mllm-model.gz"
119
    TRAIN_FILE = "mllm-train.gz"
120
121
    DEFAULT_PARAMETERS = {
122
        "min_samples_leaf": 20,
123
        "max_leaf_nodes": 1000,
124
        "max_samples": 0.9,
125
        "use_hidden_labels": False,
126
    }
127
128
    def get_hp_optimizer(self, corpus: DocumentCorpus, metric: str) -> MLLMOptimizer:
129
        return MLLMOptimizer(self, corpus, metric, MLLMHPObjective)
130
131
    def _load_model(self) -> MLLMModel:
132
        path = os.path.join(self.datadir, self.MODEL_FILE)
133
        self.debug("loading model from {}".format(path))
134
        if os.path.exists(path):
135
            return MLLMModel.load(path)
136
        else:
137
            raise NotInitializedException(
138
                "model {} not found".format(path), backend_id=self.backend_id
139
            )
140
141
    def _load_train_data(self) -> tuple[np.ndarray, np.ndarray]:
142
        path = os.path.join(self.datadir, self.TRAIN_FILE)
143
        if os.path.exists(path):
144
            return joblib.load(path)
145
        else:
146
            raise NotInitializedException(
147
                "train data file {} not found".format(path), backend_id=self.backend_id
148
            )
149
150
    def initialize(self, parallel: bool = False) -> None:
151
        if self._model is None:
152
            self._model = self._load_model()
153
154
    def _train(
155
        self,
156
        corpus: DocumentCorpus,
157
        params: dict[str, Any],
158
        jobs: int = 0,
159
    ) -> None:
160
        self.info("starting train")
161
        if corpus != "cached":
162
            if corpus.is_empty():
163
                raise NotSupportedException(
164
                    "training backend {} with no documents".format(self.backend_id)
165
                )
166
            self.info("preparing training data")
167
            self._model = MLLMModel()
168
            train_data = self._model.prepare_train(
169
                corpus, self.project.vocab, self.project.analyzer, params, jobs
170
            )
171
            annif.util.atomic_save(
172
                train_data, self.datadir, self.TRAIN_FILE, method=joblib.dump
173
            )
174
        else:
175
            self.info("reusing cached training data from previous run")
176
            self._model = self._load_model()
177
            train_data = self._load_train_data()
178
179
        self.info("training model")
180
        self._model.train(train_data[0], train_data[1], params)
181
182
        self.info("saving model")
183
        annif.util.atomic_save(self._model, self.datadir, self.MODEL_FILE)
184
185
    def _generate_candidates(self, text: str) -> list[Candidate]:
186
        return self._model.generate_candidates(text, self.project.analyzer)
187
188
    def _suggest(self, doc: Document, params: dict[str, Any]) -> Iterator:
189
        candidates = self._generate_candidates(doc.text)
190
        prediction = self._model.predict(candidates)
191
        return prediction_to_result(prediction, params, self.project.subjects)
192