Passed
Push — issue703-python-3.11-support ( f59527...05d52a )
by Juho
04:06 queued 14s
created

annif.backend.pav.PAVBackend.initialize()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 15
nop 2
dl 0
loc 16
rs 9.65
c 0
b 0
f 0
1
"""PAV ensemble backend that combines results from multiple projects and
2
learns which concept suggestions from each backend are trustworthy using the
3
PAV algorithm, a.k.a. isotonic regression, to turn raw scores returned by
4
individual backends into probabilities."""
5
from __future__ import annotations
6
7
import os.path
8
from typing import TYPE_CHECKING, Any
9
10
import joblib
11
import numpy as np
12
from scipy.sparse import coo_matrix, csc_matrix
13
from sklearn.isotonic import IsotonicRegression
14
15
import annif.corpus
16
import annif.util
17
from annif.exception import NotInitializedException, NotSupportedException
18
from annif.suggestion import SubjectSuggestion, SuggestionBatch
19
20
from . import ensemble
21
22
if TYPE_CHECKING:
23
    from annif.corpus.document import DocumentCorpus
24
    from annif.project import AnnifProject
25
26
27
class PAVBackend(ensemble.BaseEnsembleBackend):
28
    """PAV ensemble backend that combines results from multiple projects"""
29
30
    name = "pav"
31
32
    MODEL_FILE_PREFIX = "pav-model-"
33
34
    # defaults for uninitialized instances
35
    _models = None
36
37
    DEFAULT_PARAMETERS = {"min-docs": 10}
38
39
    def initialize(self, parallel: bool = False) -> None:
40
        super().initialize(parallel)
41
        if self._models is not None:
42
            return  # already initialized
43
        self._models = {}
44
        sources = annif.util.parse_sources(self.params["sources"])
45
        for source_project_id, _ in sources:
46
            model_filename = self.MODEL_FILE_PREFIX + source_project_id
47
            path = os.path.join(self.datadir, model_filename)
48
            if os.path.exists(path):
49
                self.debug("loading PAV model from {}".format(path))
50
                self._models[source_project_id] = joblib.load(path)
51
            else:
52
                raise NotInitializedException(
53
                    "PAV model file '{}' not found".format(path),
54
                    backend_id=self.backend_id,
55
                )
56
57
    def _get_model(self, source_project_id: str) -> dict[int, IsotonicRegression]:
58
        self.initialize()
59
        return self._models[source_project_id]
60
61
    def _merge_source_batches(
62
        self,
63
        batch_by_source: dict[str, SuggestionBatch],
64
        sources: list[tuple[str, float]],
65
        params: dict[str, Any],
66
    ) -> SuggestionBatch:
67
        reg_batch_by_source = {}
68
        for project_id, batch in batch_by_source.items():
69
            reg_models = self._get_model(project_id)
70
            pav_batch = [
71
                [
72
                    SubjectSuggestion(
73
                        subject_id=sugg.subject_id,
74
                        score=reg_models[sugg.subject_id].predict([sugg.score])[0],
75
                    )
76
                    if sugg.subject_id in reg_models
77
                    else SubjectSuggestion(
78
                        subject_id=sugg.subject_id, score=sugg.score
79
                    )  # default to raw score
80
                    for sugg in result
81
                ]
82
                for result in batch
83
            ]
84
            reg_batch_by_source[project_id] = SuggestionBatch.from_sequence(
85
                pav_batch, self.project.subjects
86
            )
87
88
        return super()._merge_source_batches(reg_batch_by_source, sources, params)
89
90
    @staticmethod
91
    def _suggest_train_corpus(
92
        source_project: AnnifProject, corpus: DocumentCorpus
93
    ) -> tuple[csc_matrix, csc_matrix]:
94
        # lists for constructing score matrix
95
        data, row, col = [], [], []
96
        # lists for constructing true label matrix
97
        trow, tcol = [], []
98
99
        ndocs = 0
100
        for docid, doc in enumerate(corpus.documents):
101
            hits = source_project.suggest([doc.text])[0]
102
            vector = hits.as_vector()
103
            for cid in np.flatnonzero(vector):
104
                data.append(vector[cid])
105
                row.append(docid)
106
                col.append(cid)
107
            for cid in np.flatnonzero(
108
                doc.subject_set.as_vector(len(source_project.subjects))
109
            ):
110
                trow.append(docid)
111
                tcol.append(cid)
112
            ndocs += 1
113
        scores = coo_matrix(
114
            (data, (row, col)),
115
            shape=(ndocs, len(source_project.subjects)),
116
            dtype=np.float32,
117
        )
118
        true = coo_matrix(
119
            (np.ones(len(trow), dtype=bool), (trow, tcol)),
120
            shape=(ndocs, len(source_project.subjects)),
121
            dtype=bool,
122
        )
123
        return csc_matrix(scores), csc_matrix(true)
124
125
    def _create_pav_model(
126
        self, source_project_id: str, min_docs: int, corpus: DocumentCorpus
127
    ) -> None:
128
        self.info(
129
            "creating PAV model for source {}, min_docs={}".format(
130
                source_project_id, min_docs
131
            )
132
        )
133
        source_project = self.project.registry.get_project(source_project_id)
134
        # suggest subjects for the training corpus
135
        scores, true = self._suggest_train_corpus(source_project, corpus)
136
        # create the concept-specific PAV regression models
137
        pav_regressions = {}
138
        for cid in range(len(source_project.subjects)):
139
            if true[:, cid].sum() < min_docs:
140
                continue  # don't create model b/c of too few examples
141
            reg = IsotonicRegression(out_of_bounds="clip")
142
            cid_scores = scores[:, cid].toarray().flatten().astype(np.float64)
143
            reg.fit(cid_scores, true[:, cid].toarray().flatten())
144
            pav_regressions[cid] = reg
145
        self.info("created PAV model for {} concepts".format(len(pav_regressions)))
146
        model_filename = self.MODEL_FILE_PREFIX + source_project_id
147
        annif.util.atomic_save(
148
            pav_regressions, self.datadir, model_filename, method=joblib.dump
149
        )
150
151
    def _train(
152
        self,
153
        corpus: DocumentCorpus,
154
        params: dict[str, Any],
155
        jobs: int = 0,
156
    ) -> None:
157
        if corpus == "cached":
158
            raise NotSupportedException(
159
                "Training pav project from cached data not supported."
160
            )
161
        if corpus.is_empty():
162
            raise NotSupportedException(
163
                "training backend {} with no documents".format(self.backend_id)
164
            )
165
        self.info("creating PAV models")
166
        sources = annif.util.parse_sources(self.params["sources"])
167
        min_docs = int(params["min-docs"])
168
        for source_project_id, _ in sources:
169
            self._create_pav_model(source_project_id, min_docs, corpus)
170