PAVBackend._suggest_train_corpus()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 34
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 27
nop 2
dl 0
loc 34
rs 9.232
c 0
b 0
f 0
1
"""PAV ensemble backend that combines results from multiple projects and
2
learns which concept suggestions from each backend are trustworthy using the
3
PAV algorithm, a.k.a. isotonic regression, to turn raw scores returned by
4
individual backends into probabilities."""
5
6
from __future__ import annotations
7
8
import os.path
9
from typing import TYPE_CHECKING, Any
10
11
import joblib
12
import numpy as np
13
from scipy.sparse import coo_matrix, csc_matrix
14
from sklearn.isotonic import IsotonicRegression
15
16
import annif.corpus
17
import annif.util
18
from annif.exception import NotInitializedException, NotSupportedException
19
from annif.suggestion import SubjectSuggestion, SuggestionBatch
20
21
from . import ensemble
22
23
if TYPE_CHECKING:
24
    from annif.corpus.document import DocumentCorpus
25
    from annif.project import AnnifProject
26
27
28
class PAVBackend(ensemble.BaseEnsembleBackend):
29
    """PAV ensemble backend that combines results from multiple projects"""
30
31
    name = "pav"
32
33
    MODEL_FILE_PREFIX = "pav-model-"
34
35
    # defaults for uninitialized instances
36
    _models = None
37
38
    DEFAULT_PARAMETERS = {"min-docs": 10}
39
40
    def initialize(self, parallel: bool = False) -> None:
41
        super().initialize(parallel)
42
        if self._models is not None:
43
            return  # already initialized
44
        self._models = {}
45
        sources = annif.util.parse_sources(self.params["sources"])
46
        for source_project_id, _ in sources:
47
            model_filename = self.MODEL_FILE_PREFIX + source_project_id
48
            path = os.path.join(self.datadir, model_filename)
49
            if os.path.exists(path):
50
                self.debug("loading PAV model from {}".format(path))
51
                self._models[source_project_id] = joblib.load(path)
52
            else:
53
                raise NotInitializedException(
54
                    "PAV model file '{}' not found".format(path),
55
                    backend_id=self.backend_id,
56
                )
57
58
    def _get_model(self, source_project_id: str) -> dict[int, IsotonicRegression]:
59
        self.initialize()
60
        return self._models[source_project_id]
61
62
    def _merge_source_batches(
63
        self,
64
        batch_by_source: dict[str, SuggestionBatch],
65
        sources: list[tuple[str, float]],
66
        params: dict[str, Any],
67
    ) -> SuggestionBatch:
68
        reg_batch_by_source = {}
69
        for project_id, batch in batch_by_source.items():
70
            reg_models = self._get_model(project_id)
71
            pav_batch = [
72
                [
73
                    (
74
                        SubjectSuggestion(
75
                            subject_id=sugg.subject_id,
76
                            score=reg_models[sugg.subject_id].predict([sugg.score])[0],
77
                        )
78
                        if sugg.subject_id in reg_models
79
                        else SubjectSuggestion(
80
                            subject_id=sugg.subject_id, score=sugg.score
81
                        )
82
                    )  # default to raw score
83
                    for sugg in result
84
                ]
85
                for result in batch
86
            ]
87
            reg_batch_by_source[project_id] = SuggestionBatch.from_sequence(
88
                pav_batch, self.project.subjects
89
            )
90
91
        return super()._merge_source_batches(reg_batch_by_source, sources, params)
92
93
    @staticmethod
94
    def _suggest_train_corpus(
95
        source_project: AnnifProject, corpus: DocumentCorpus
96
    ) -> tuple[csc_matrix, csc_matrix]:
97
        # lists for constructing score matrix
98
        data, row, col = [], [], []
99
        # lists for constructing true label matrix
100
        trow, tcol = [], []
101
102
        ndocs = 0
103
        for docid, doc in enumerate(corpus.documents):
104
            hits = source_project.suggest([doc])[0]
105
            vector = hits.as_vector()
106
            for cid in np.flatnonzero(vector):
107
                data.append(vector[cid])
108
                row.append(docid)
109
                col.append(cid)
110
            for cid in np.flatnonzero(
111
                doc.subject_set.as_vector(len(source_project.subjects))
112
            ):
113
                trow.append(docid)
114
                tcol.append(cid)
115
            ndocs += 1
116
        scores = coo_matrix(
117
            (data, (row, col)),
118
            shape=(ndocs, len(source_project.subjects)),
119
            dtype=np.float32,
120
        )
121
        true = coo_matrix(
122
            (np.ones(len(trow), dtype=bool), (trow, tcol)),
123
            shape=(ndocs, len(source_project.subjects)),
124
            dtype=bool,
125
        )
126
        return csc_matrix(scores), csc_matrix(true)
127
128
    def _create_pav_model(
129
        self, source_project_id: str, min_docs: int, corpus: DocumentCorpus
130
    ) -> None:
131
        self.info(
132
            "creating PAV model for source {}, min_docs={}".format(
133
                source_project_id, min_docs
134
            )
135
        )
136
        source_project = self.project.registry.get_project(source_project_id)
137
        # suggest subjects for the training corpus
138
        scores, true = self._suggest_train_corpus(source_project, corpus)
139
        # create the concept-specific PAV regression models
140
        pav_regressions = {}
141
        for cid in range(len(source_project.subjects)):
142
            if true[:, cid].sum() < min_docs:
143
                continue  # don't create model b/c of too few examples
144
            reg = IsotonicRegression(out_of_bounds="clip")
145
            cid_scores = scores[:, cid].toarray().flatten().astype(np.float64)
146
            reg.fit(cid_scores, true[:, cid].toarray().flatten())
147
            pav_regressions[cid] = reg
148
        self.info("created PAV model for {} concepts".format(len(pav_regressions)))
149
        model_filename = self.MODEL_FILE_PREFIX + source_project_id
150
        annif.util.atomic_save(
151
            pav_regressions, self.datadir, model_filename, method=joblib.dump
152
        )
153
154
    def _train(
155
        self,
156
        corpus: DocumentCorpus,
157
        params: dict[str, Any],
158
        jobs: int = 0,
159
    ) -> None:
160
        if corpus == "cached":
161
            raise NotSupportedException(
162
                "Training pav project from cached data not supported."
163
            )
164
        if corpus.is_empty():
165
            raise NotSupportedException(
166
                "training backend {} with no documents".format(self.backend_id)
167
            )
168
        self.info("creating PAV models")
169
        sources = annif.util.parse_sources(self.params["sources"])
170
        min_docs = int(params["min-docs"])
171
        for source_project_id, _ in sources:
172
            self._create_pav_model(source_project_id, min_docs, corpus)
173