Passed
Push — issue688-hyperopt-multiprocess... ( e12c04 )
by Osma
03:24
created

annif.backend.hyperopt   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 152
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 85
dl 0
loc 152
rs 10
c 0
b 0
f 0
wmc 15

10 Methods

Rating   Name   Duplication   Size   Complexity  
A HyperparameterOptimizer.optimize() 0 28 5
A HyperparameterOptimizer._prepare() 0 6 1
A TrialWriter.__init__() 0 4 1
A HyperparameterOptimizer._normalize() 0 4 1
A AnnifHyperoptBackend.get_hp_optimizer() 0 7 1
A TrialWriter.write() 0 17 2
A HyperparameterOptimizer._objective() 0 4 1
A HyperparameterOptimizer._run_trial() 0 21 1
A HyperparameterOptimizer.__init__() 0 6 1
A HyperparameterOptimizer._postprocess() 0 4 1
1
"""Hyperparameter optimization functionality for backends"""
2
3
from __future__ import annotations
4
5
import abc
6
import collections
7
import tempfile
8
from typing import TYPE_CHECKING, Any, Callable
9
10
import optuna
11
import optuna.exceptions
12
13
import annif.parallel
14
15
from .backend import AnnifBackend
16
17
if TYPE_CHECKING:
18
    from click.utils import LazyFile
19
    from optuna.study.study import Study
20
    from optuna.trial import Trial
21
22
    from annif.corpus.document import DocumentCorpus
23
24
HPRecommendation = collections.namedtuple("HPRecommendation", "lines score")
25
26
27
class TrialWriter:
28
    """Object that writes hyperparameter optimization trial results into a
29
    TSV file."""
30
31
    def __init__(self, results_file: LazyFile, normalize_func: Callable) -> None:
32
        self.results_file = results_file
33
        self.normalize_func = normalize_func
34
        self.header_written = False
35
36
    def write(self, trial_data: dict[str, Any]) -> None:
37
        """Write the results of one trial into the results file.  On the
38
        first run, write the header line first."""
39
40
        if not self.header_written:
41
            param_names = list(trial_data["params"].keys())
42
            print("\t".join(["trial", "value"] + param_names), file=self.results_file)
43
            self.header_written = True
44
        print(
45
            "\t".join(
46
                (
47
                    str(e)
48
                    for e in [trial_data["number"], trial_data["value"]]
49
                    + list(self.normalize_func(trial_data["params"]).values())
50
                )
51
            ),
52
            file=self.results_file,
53
        )
54
55
56
class HyperparameterOptimizer:
57
    """Base class for hyperparameter optimizers"""
58
59
    def __init__(
60
        self, backend: AnnifBackend, corpus: DocumentCorpus, metric: str
61
    ) -> None:
62
        self._backend = backend
63
        self._corpus = corpus
64
        self._metric = metric
65
66
    def _prepare(self, n_jobs: int = 1):
67
        """Prepare the optimizer for hyperparameter evaluation.  Up to
68
        n_jobs parallel threads or processes may be used during the
69
        operation."""
70
71
        pass  # pragma: no cover
72
73
    @abc.abstractmethod
74
    def _objective(self, trial: Trial) -> float:
75
        """Objective function to optimize"""
76
        pass  # pragma: no cover
77
78
    @abc.abstractmethod
79
    def _postprocess(self, study: Study) -> HPRecommendation:
80
        """Convert the study results into hyperparameter recommendations"""
81
        pass  # pragma: no cover
82
83
    def _normalize(self, hps: dict[str, float]) -> dict[str, float]:
84
        """Normalize the given raw hyperparameters. Intended to be overridden
85
        by subclasses when necessary. The default is to keep them as-is."""
86
        return hps
87
88
    def _run_trial(
89
        self, trial_id: int, storage_url: str, study_name: str
90
    ) -> dict[str, Any]:
91
92
        # use a callback to set the completed trial, to avoid race conditions
93
        completed_trial = []
94
95
        def set_trial_callback(study: Study, trial: Trial) -> None:
96
            completed_trial.append(trial)
97
98
        study = optuna.load_study(storage=storage_url, study_name=study_name)
99
        study.optimize(
100
            self._objective,
101
            n_trials=1,
102
            callbacks=[set_trial_callback],
103
        )
104
105
        return {
106
            "number": completed_trial[0].number,
107
            "value": completed_trial[0].value,
108
            "params": completed_trial[0].params,
109
        }
110
111
    def optimize(
112
        self, n_trials: int, n_jobs: int, results_file: LazyFile | None
113
    ) -> HPRecommendation:
114
        """Find the optimal hyperparameters by testing up to the given number
115
        of hyperparameter combinations"""
116
117
        self._prepare(n_jobs)
118
119
        writer = TrialWriter(results_file, self._normalize) if results_file else None
120
        write_callback = writer.write if writer else None
121
122
        temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
123
        storage_url = f"sqlite:///{temp_db.name}"
124
125
        study = optuna.create_study(direction="maximize", storage=storage_url)
126
127
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
128
        with pool_class(jobs) as pool:
129
            for i in range(n_trials):
130
                pool.apply_async(
131
                    self._run_trial,
132
                    args=(i, storage_url, study.study_name),
133
                    callback=write_callback,
134
                )
135
            pool.close()
136
            pool.join()
137
138
        return self._postprocess(study)
139
140
141
class AnnifHyperoptBackend(AnnifBackend):
142
    """Base class for Annif backends that can perform hyperparameter
143
    optimization"""
144
145
    @abc.abstractmethod
146
    def get_hp_optimizer(self, corpus: DocumentCorpus, metric: str):
147
        """Get a HyperparameterOptimizer object that can look for
148
        optimal hyperparameter combinations for the given corpus,
149
        measured using the given metric"""
150
151
        pass  # pragma: no cover
152