Passed
Push — issue678-refactor-suggestionre... ( ec0260...82f1b2 )
by Osma
05:25 queued 02:48
created

annif.backend.pav   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 148
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 109
dl 0
loc 148
rs 10
c 0
b 0
f 0
wmc 22

7 Methods

Rating   Name   Duplication   Size   Complexity  
A PAVBackend._train() 0 14 4
A PAVBackend._get_model() 0 3 1
A PAVBackend._create_pav_model() 0 22 3
A PAVBackend.default_params() 0 4 1
A PAVBackend.initialize() 0 16 4
A PAVBackend._suggest_train_corpus() 0 32 4
A PAVBackend._normalize_suggestion_batch() 0 16 5
1
"""PAV ensemble backend that combines results from multiple projects and
2
learns which concept suggestions from each backend are trustworthy using the
3
PAV algorithm, a.k.a. isotonic regression, to turn raw scores returned by
4
individual backends into probabilities."""
5
6
import os.path
7
8
import joblib
9
import numpy as np
10
from scipy.sparse import coo_matrix, csc_matrix
11
from sklearn.isotonic import IsotonicRegression
12
13
import annif.corpus
14
import annif.util
15
from annif.exception import NotInitializedException, NotSupportedException
16
from annif.suggestion import SubjectSuggestion, SuggestionBatch
17
18
from . import backend, ensemble
19
20
21
class PAVBackend(ensemble.BaseEnsembleBackend):
22
    """PAV ensemble backend that combines results from multiple projects"""
23
24
    name = "pav"
25
26
    MODEL_FILE_PREFIX = "pav-model-"
27
28
    # defaults for uninitialized instances
29
    _models = None
30
31
    DEFAULT_PARAMETERS = {"min-docs": 10}
32
33
    def default_params(self):
34
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
35
        params.update(self.DEFAULT_PARAMETERS)
36
        return params
37
38
    def initialize(self, parallel=False):
39
        super().initialize(parallel)
40
        if self._models is not None:
41
            return  # already initialized
42
        self._models = {}
43
        sources = annif.util.parse_sources(self.params["sources"])
44
        for source_project_id, _ in sources:
45
            model_filename = self.MODEL_FILE_PREFIX + source_project_id
46
            path = os.path.join(self.datadir, model_filename)
47
            if os.path.exists(path):
48
                self.debug("loading PAV model from {}".format(path))
49
                self._models[source_project_id] = joblib.load(path)
50
            else:
51
                raise NotInitializedException(
52
                    "PAV model file '{}' not found".format(path),
53
                    backend_id=self.backend_id,
54
                )
55
56
    def _get_model(self, source_project_id):
57
        self.initialize()
58
        return self._models[source_project_id]
59
60
    def _normalize_suggestion_batch(self, batch, source_project):
61
        reg_models = self._get_model(source_project.project_id)
62
        pav_batch = []
63
        for result in batch:
64
            pav_result = []
65
            for sugg in result:
66
                if sugg.subject_id in reg_models:
67
                    score = reg_models[sugg.subject_id].predict([sugg.score])[0]
68
                else:  # default to raw score
69
                    score = sugg.score
70
                pav_result.append(
71
                    SubjectSuggestion(subject_id=sugg.subject_id, score=score)
72
                )
73
            pav_result.sort(key=lambda hit: hit.score, reverse=True)
74
            pav_batch.append(pav_result)
75
        return SuggestionBatch.from_sequence(pav_batch, self.project.subjects)
76
77
    @staticmethod
78
    def _suggest_train_corpus(source_project, corpus):
79
        # lists for constructing score matrix
80
        data, row, col = [], [], []
81
        # lists for constructing true label matrix
82
        trow, tcol = [], []
83
84
        ndocs = 0
85
        for docid, doc in enumerate(corpus.documents):
86
            hits = source_project.suggest([doc.text])[0]
87
            vector = hits.as_vector(len(source_project.subjects))
88
            for cid in np.flatnonzero(vector):
89
                data.append(vector[cid])
90
                row.append(docid)
91
                col.append(cid)
92
            for cid in np.flatnonzero(
93
                doc.subject_set.as_vector(len(source_project.subjects))
94
            ):
95
                trow.append(docid)
96
                tcol.append(cid)
97
            ndocs += 1
98
        scores = coo_matrix(
99
            (data, (row, col)),
100
            shape=(ndocs, len(source_project.subjects)),
101
            dtype=np.float32,
102
        )
103
        true = coo_matrix(
104
            (np.ones(len(trow), dtype=bool), (trow, tcol)),
105
            shape=(ndocs, len(source_project.subjects)),
106
            dtype=bool,
107
        )
108
        return csc_matrix(scores), csc_matrix(true)
109
110
    def _create_pav_model(self, source_project_id, min_docs, corpus):
111
        self.info(
112
            "creating PAV model for source {}, min_docs={}".format(
113
                source_project_id, min_docs
114
            )
115
        )
116
        source_project = self.project.registry.get_project(source_project_id)
117
        # suggest subjects for the training corpus
118
        scores, true = self._suggest_train_corpus(source_project, corpus)
119
        # create the concept-specific PAV regression models
120
        pav_regressions = {}
121
        for cid in range(len(source_project.subjects)):
122
            if true[:, cid].sum() < min_docs:
123
                continue  # don't create model b/c of too few examples
124
            reg = IsotonicRegression(out_of_bounds="clip")
125
            cid_scores = scores[:, cid].toarray().flatten().astype(np.float64)
126
            reg.fit(cid_scores, true[:, cid].toarray().flatten())
127
            pav_regressions[cid] = reg
128
        self.info("created PAV model for {} concepts".format(len(pav_regressions)))
129
        model_filename = self.MODEL_FILE_PREFIX + source_project_id
130
        annif.util.atomic_save(
131
            pav_regressions, self.datadir, model_filename, method=joblib.dump
132
        )
133
134
    def _train(self, corpus, params, jobs=0):
135
        if corpus == "cached":
136
            raise NotSupportedException(
137
                "Training pav project from cached data not supported."
138
            )
139
        if corpus.is_empty():
140
            raise NotSupportedException(
141
                "training backend {} with no documents".format(self.backend_id)
142
            )
143
        self.info("creating PAV models")
144
        sources = annif.util.parse_sources(self.params["sources"])
145
        min_docs = int(params["min-docs"])
146
        for source_project_id, _ in sources:
147
            self._create_pav_model(source_project_id, min_docs, corpus)
148