Passed
Push — issue678-refactor-suggestionre... ( 0d7003...d7e7fa )
by Osma
02:49
created

annif.backend.pav   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 155
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 112
dl 0
loc 155
rs 10
c 0
b 0
f 0
wmc 20

7 Methods

Rating   Name   Duplication   Size   Complexity  
A PAVBackend._get_model() 0 3 1
A PAVBackend.default_params() 0 4 1
A PAVBackend.initialize() 0 16 4
A PAVBackend._train() 0 14 4
A PAVBackend._merge_source_batches() 0 23 3
A PAVBackend._suggest_train_corpus() 0 32 4
A PAVBackend._create_pav_model() 0 22 3
1
"""PAV ensemble backend that combines results from multiple projects and
2
learns which concept suggestions from each backend are trustworthy using the
3
PAV algorithm, a.k.a. isotonic regression, to turn raw scores returned by
4
individual backends into probabilities."""
5
6
import os.path
7
8
import joblib
9
import numpy as np
10
from scipy.sparse import coo_matrix, csc_matrix
11
from sklearn.isotonic import IsotonicRegression
12
13
import annif.corpus
14
import annif.util
15
from annif.exception import NotInitializedException, NotSupportedException
16
from annif.suggestion import SubjectSuggestion, SuggestionBatch
17
18
from . import backend, ensemble
19
20
21
class PAVBackend(ensemble.BaseEnsembleBackend):
22
    """PAV ensemble backend that combines results from multiple projects"""
23
24
    name = "pav"
25
26
    MODEL_FILE_PREFIX = "pav-model-"
27
28
    # defaults for uninitialized instances
29
    _models = None
30
31
    DEFAULT_PARAMETERS = {"min-docs": 10}
32
33
    def default_params(self):
34
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
35
        params.update(self.DEFAULT_PARAMETERS)
36
        return params
37
38
    def initialize(self, parallel=False):
39
        super().initialize(parallel)
40
        if self._models is not None:
41
            return  # already initialized
42
        self._models = {}
43
        sources = annif.util.parse_sources(self.params["sources"])
44
        for source_project_id, _ in sources:
45
            model_filename = self.MODEL_FILE_PREFIX + source_project_id
46
            path = os.path.join(self.datadir, model_filename)
47
            if os.path.exists(path):
48
                self.debug("loading PAV model from {}".format(path))
49
                self._models[source_project_id] = joblib.load(path)
50
            else:
51
                raise NotInitializedException(
52
                    "PAV model file '{}' not found".format(path),
53
                    backend_id=self.backend_id,
54
                )
55
56
    def _get_model(self, source_project_id):
57
        self.initialize()
58
        return self._models[source_project_id]
59
60
    def _merge_source_batches(self, batch_by_source, sources, params):
61
        reg_batch_by_source = {}
62
        for project_id, batch in batch_by_source.items():
63
            reg_models = self._get_model(project_id)
64
            pav_batch = [
65
                [
66
                    SubjectSuggestion(
67
                        subject_id=sugg.subject_id,
68
                        score=reg_models[sugg.subject_id].predict([sugg.score])[0],
69
                    )
70
                    if sugg.subject_id in reg_models
71
                    else SubjectSuggestion(
72
                        subject_id=sugg.subject_id, score=sugg.score
73
                    )  # default to raw score
74
                    for sugg in result
75
                ]
76
                for result in batch
77
            ]
78
            reg_batch_by_source[project_id] = SuggestionBatch.from_sequence(
79
                pav_batch, self.project.subjects
80
            )
81
82
        return super()._merge_source_batches(reg_batch_by_source, sources, params)
83
84
    @staticmethod
85
    def _suggest_train_corpus(source_project, corpus):
86
        # lists for constructing score matrix
87
        data, row, col = [], [], []
88
        # lists for constructing true label matrix
89
        trow, tcol = [], []
90
91
        ndocs = 0
92
        for docid, doc in enumerate(corpus.documents):
93
            hits = source_project.suggest([doc.text])[0]
94
            vector = hits.as_vector()
95
            for cid in np.flatnonzero(vector):
96
                data.append(vector[cid])
97
                row.append(docid)
98
                col.append(cid)
99
            for cid in np.flatnonzero(
100
                doc.subject_set.as_vector(len(source_project.subjects))
101
            ):
102
                trow.append(docid)
103
                tcol.append(cid)
104
            ndocs += 1
105
        scores = coo_matrix(
106
            (data, (row, col)),
107
            shape=(ndocs, len(source_project.subjects)),
108
            dtype=np.float32,
109
        )
110
        true = coo_matrix(
111
            (np.ones(len(trow), dtype=bool), (trow, tcol)),
112
            shape=(ndocs, len(source_project.subjects)),
113
            dtype=bool,
114
        )
115
        return csc_matrix(scores), csc_matrix(true)
116
117
    def _create_pav_model(self, source_project_id, min_docs, corpus):
118
        self.info(
119
            "creating PAV model for source {}, min_docs={}".format(
120
                source_project_id, min_docs
121
            )
122
        )
123
        source_project = self.project.registry.get_project(source_project_id)
124
        # suggest subjects for the training corpus
125
        scores, true = self._suggest_train_corpus(source_project, corpus)
126
        # create the concept-specific PAV regression models
127
        pav_regressions = {}
128
        for cid in range(len(source_project.subjects)):
129
            if true[:, cid].sum() < min_docs:
130
                continue  # don't create model b/c of too few examples
131
            reg = IsotonicRegression(out_of_bounds="clip")
132
            cid_scores = scores[:, cid].toarray().flatten().astype(np.float64)
133
            reg.fit(cid_scores, true[:, cid].toarray().flatten())
134
            pav_regressions[cid] = reg
135
        self.info("created PAV model for {} concepts".format(len(pav_regressions)))
136
        model_filename = self.MODEL_FILE_PREFIX + source_project_id
137
        annif.util.atomic_save(
138
            pav_regressions, self.datadir, model_filename, method=joblib.dump
139
        )
140
141
    def _train(self, corpus, params, jobs=0):
142
        if corpus == "cached":
143
            raise NotSupportedException(
144
                "Training pav project from cached data not supported."
145
            )
146
        if corpus.is_empty():
147
            raise NotSupportedException(
148
                "training backend {} with no documents".format(self.backend_id)
149
            )
150
        self.info("creating PAV models")
151
        sources = annif.util.parse_sources(self.params["sources"])
152
        min_docs = int(params["min-docs"])
153
        for source_project_id, _ in sources:
154
            self._create_pav_model(source_project_id, min_docs, corpus)
155