Passed
Pull Request — master (#414)
by Osma
01:47
created

EnsembleBackend.get_hp_optimizer()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
"""Ensemble backend that combines results from multiple projects"""
2
3
4
import annif.suggestion
5
import annif.util
6
import annif.eval
7
from . import hyperopt
8
from annif.exception import NotSupportedException
9
10
11
class EnsembleOptimizer(hyperopt.HyperparameterOptimizer):
12
    """Hyperparameter optimizer for the ensemble backend"""
13
14
    def __init__(self, backend, corpus, metric):
15
        super().__init__(backend, corpus, metric)
16
        self._sources = [project_id for project_id, _
17
                         in annif.util.parse_sources(
18
                             backend.config_params['sources'])]
19
20
    def _prepare(self):
21
        self._gold_subjects = []
22
        self._source_hits = []
23
24
        for doc in self._corpus.documents:
25
            self._gold_subjects.append(
26
                annif.corpus.SubjectSet((doc.uris, doc.labels)))
27
            srchits = {}
28
            for project_id in self._sources:
29
                registry = self._backend.project.registry
30
                source_project = registry.get_project(project_id)
31
                hits = source_project.suggest(doc.text)
32
                srchits[project_id] = hits
33
            self._source_hits.append(srchits)
34
35
    def _normalize(self, hps):
36
        total = sum(hps.values())
37
        return {source: hps[source] / total for source in hps}
38
39
    def _format_cfg_line(self, hps):
40
        return 'sources=' + ','.join([f"{src}:{weight:.4f}"
41
                                      for src, weight in hps.items()])
42
43
    def _objective(self, trial):
44
        batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
45
        weights = {project_id: trial.suggest_uniform(project_id, 0.0, 1.0)
46
                   for project_id in self._sources}
47
        for goldsubj, srchits in zip(self._gold_subjects, self._source_hits):
48
            weighted_hits = []
49
            for project_id, hits in srchits.items():
50
                weighted_hits.append(annif.suggestion.WeightedSuggestion(
51
                    hits=hits,
52
                    weight=weights[project_id],
53
                    subjects=self._backend.project.subjects))
54
            batch.evaluate(
55
                annif.util.merge_hits(
56
                    weighted_hits,
57
                    self._backend.project.subjects),
58
                goldsubj)
59
        results = batch.results(metrics=[self._metric])
60
        return results[self._metric]
61
62
    def _postprocess(self, study):
63
        line = self._format_cfg_line(self._normalize(study.best_params))
64
        return hyperopt.HPRecommendation(lines=[line], score=study.best_value)
65
66
67
class EnsembleBackend(hyperopt.AnnifHyperoptBackend):
68
    """Ensemble backend that combines results from multiple projects"""
69
    name = "ensemble"
70
71
    def get_hp_optimizer(self, corpus, metric):
72
        return EnsembleOptimizer(self, corpus, metric)
73
74
    @property
75
    def is_trained(self):
76
        sources_trained = self._get_sources_attribute('is_trained')
77
        return all(sources_trained)
78
79
    @property
80
    def modification_time(self):
81
        mtimes = self._get_sources_attribute('modification_time')
82
        return max(filter(None, mtimes), default=None)
83
84
    def _get_sources_attribute(self, attr):
85
        params = self._get_backend_params(None)
86
        sources = annif.util.parse_sources(params['sources'])
87
        return [getattr(self.project.registry.get_project(project_id), attr)
88
                for project_id, _ in sources]
89
90
    def initialize(self):
91
        # initialize all the source projects
92
        params = self._get_backend_params(None)
93
        for project_id, _ in annif.util.parse_sources(params['sources']):
94
            project = self.project.registry.get_project(project_id)
95
            project.initialize()
96
97
    def _normalize_hits(self, hits, source_project):
98
        """Hook for processing hits from backends. Intended to be overridden
99
        by subclasses."""
100
        return hits
101
102
    def _suggest_with_sources(self, text, sources):
103
        hits_from_sources = []
104
        for project_id, weight in sources:
105
            source_project = self.project.registry.get_project(project_id)
106
            hits = source_project.suggest(text)
107
            self.debug(
108
                'Got {} hits from project {}, weight {}'.format(
109
                    len(hits), source_project.project_id, weight))
110
            norm_hits = self._normalize_hits(hits, source_project)
111
            hits_from_sources.append(
112
                annif.suggestion.WeightedSuggestion(
113
                    hits=norm_hits,
114
                    weight=weight,
115
                    subjects=source_project.subjects))
116
        return hits_from_sources
117
118
    def _merge_hits_from_sources(self, hits_from_sources, params):
119
        """Hook for merging hits from sources. Can be overridden by
120
        subclasses."""
121
        return annif.util.merge_hits(hits_from_sources, self.project.subjects)
122
123
    def _suggest(self, text, params):
124
        sources = annif.util.parse_sources(params['sources'])
125
        hits_from_sources = self._suggest_with_sources(text, sources)
126
        merged_hits = self._merge_hits_from_sources(hits_from_sources, params)
127
        self.debug('{} hits after merging'.format(len(merged_hits)))
128
        return merged_hits
129
130
    def _train(self, corpus, params):
131
        raise NotSupportedException('Training ensemble model is not possible.')
132