Hyperactive._create_shared_memory()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 25
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 19
nop 1
dl 0
loc 25
rs 8
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
import copy
7
import multiprocessing as mp
8
import pandas as pd
9
10
from typing import Union, List, Dict, Type
11
12
from .optimizers import RandomSearchOptimizer
13
from .run_search import run_search
14
15
from .results import Results
16
from .print_results import PrintResults
17
from .search_space import SearchSpace
18
19
20
class Hyperactive:
21
    """
22
    Initialize the Hyperactive class to manage optimization processes.
23
24
    Parameters:
25
    - verbosity: List of verbosity levels (default: ["progress_bar", "print_results", "print_times"])
26
    - distribution: String indicating the distribution method (default: "multiprocessing")
27
    - n_processes: Number of processes to run in parallel or "auto" to determine automatically (default: "auto")
28
29
    Methods:
30
    - add_search: Add a new optimization search process with specified parameters
31
    - run: Execute the optimization searches
32
    - best_para: Get the best parameters for a specific search
33
    - best_score: Get the best score for a specific search
34
    - search_data: Get the search data for a specific search
35
    """
36
37
    def __init__(
38
        self,
39
        verbosity: list = ["progress_bar", "print_results", "print_times"],
40
        distribution: str = "multiprocessing",
41
        n_processes: Union[str, int] = "auto",
42
    ):
43
        super().__init__()
44
        if verbosity is False:
45
            verbosity = []
46
47
        self.verbosity = verbosity
48
        self.distribution = distribution
49
        self.n_processes = n_processes
50
51
        self.opt_pros = {}
52
53
    def _create_shared_memory(self):
54
        _bundle_opt_processes = {}
55
56
        for opt_pros in self.opt_pros.values():
57
            if opt_pros.memory != "share":
58
                continue
59
            name = opt_pros.objective_function.__name__
60
61
            _bundle_opt_processes.setdefault(name, []).append(opt_pros)
62
63
        for opt_pros_l in _bundle_opt_processes.values():
64
            # Check if the lengths of the search spaces of all optimizers in the list are the same.
65
            if (
66
                len(set(len(opt_pros.s_space()) for opt_pros in opt_pros_l))
67
                == 1
68
            ):
69
                manager = mp.Manager()  # get new manager.dict
70
                shared_memory = manager.dict()
71
                for opt_pros in opt_pros_l:
72
                    opt_pros.memory = shared_memory
73
            else:
74
                for opt_pros in opt_pros_l:
75
                    opt_pros.memory = opt_pros_l[
76
                        0
77
                    ].memory  # get same manager.dict
78
79
    @staticmethod
80
    def _default_opt(optimizer):
81
        if isinstance(optimizer, str):
82
            if optimizer == "default":
83
                optimizer = RandomSearchOptimizer()
84
        return copy.deepcopy(optimizer)
85
86
    @staticmethod
87
    def _default_search_id(search_id, objective_function):
88
        if not search_id:
89
            search_id = objective_function.__name__
90
        return search_id
91
92
    @staticmethod
93
    def check_list(search_space):
94
        for key in search_space.keys():
95
            search_dim = search_space[key]
96
97
            error_msg = "Value in '{}' of search space dictionary must be of type list".format(
98
                key
99
            )
100
            if not isinstance(search_dim, list):
101
                print("Warning", error_msg)
102
                # raise ValueError(error_msg)
103
104
    def add_search(
105
        self,
106
        objective_function: callable,
107
        search_space: Dict[str, list],
108
        n_iter: int,
109
        search_id=None,
110
        optimizer: Union[str, Type[RandomSearchOptimizer]] = "default",
111
        n_jobs: int = 1,
112
        initialize: Dict[str, int] = {"grid": 4, "random": 2, "vertices": 4},
113
        constraints: List[callable] = None,
114
        pass_through: Dict = None,
115
        callbacks: Dict[str, callable] = None,
116
        catch: Dict = None,
117
        max_score: float = None,
118
        early_stopping: Dict = None,
119
        random_state: int = None,
120
        memory: Union[str, bool] = "share",
121
        memory_warm_start: pd.DataFrame = None,
122
    ):
123
        """
124
        Add a new optimization search process with specified parameters.
125
126
        Parameters:
127
        - objective_function: The objective function to optimize.
128
        - search_space: Dictionary defining the search space for optimization.
129
        - n_iter: Number of iterations for the optimization process.
130
        - search_id: Identifier for the search process (default: None).
131
        - optimizer: The optimizer to use for the search process (default: "default").
132
        - n_jobs: Number of parallel jobs to run (default: 1).
133
        - initialize: Dictionary specifying initialization parameters (default: {"grid": 4, "random": 2, "vertices": 4}).
134
        - constraints: List of constraint functions (default: None).
135
        - pass_through: Dictionary of additional parameters to pass through (default: None).
136
        - callbacks: Dictionary of callback functions (default: None).
137
        - catch: Dictionary of exceptions to catch during optimization (default: None).
138
        - max_score: Maximum score to achieve (default: None).
139
        - early_stopping: Dictionary specifying early stopping criteria (default: None).
140
        - random_state: Seed for random number generation (default: None).
141
        - memory: Option to share memory between processes (default: "share").
142
        - memory_warm_start: DataFrame containing warm start memory (default: None).
143
        """
144
145
        self.check_list(search_space)
146
147
        constraints = constraints or []
148
        pass_through = pass_through or {}
149
        callbacks = callbacks or {}
150
        catch = catch or {}
151
        early_stopping = early_stopping or {}
152
153
        optimizer = self._default_opt(optimizer)
154
        search_id = self._default_search_id(search_id, objective_function)
155
        s_space = SearchSpace(search_space)
156
157
        optimizer.setup_search(
158
            objective_function=objective_function,
159
            s_space=s_space,
160
            n_iter=n_iter,
161
            initialize=initialize,
162
            constraints=constraints,
163
            pass_through=pass_through,
164
            callbacks=callbacks,
165
            catch=catch,
166
            max_score=max_score,
167
            early_stopping=early_stopping,
168
            random_state=random_state,
169
            memory=memory,
170
            memory_warm_start=memory_warm_start,
171
            verbosity=self.verbosity,
172
        )
173
174
        n_jobs = mp.cpu_count() if n_jobs == -1 else n_jobs
175
176
        for _ in range(n_jobs):
177
            nth_process = len(self.opt_pros)
178
            self.opt_pros[nth_process] = optimizer
179
180
    def _print_info(self):
181
        print_res = PrintResults(self.opt_pros, self.verbosity)
182
183
        if self.verbosity:
184
            for _ in range(len(self.opt_pros)):
185
                print("")
186
187
        for results in self.results_list:
188
            nth_process = results["nth_process"]
189
            print_res.print_process(results, nth_process)
190
191
    def run(self, max_time: float = None):
192
        """
193
        Run the optimization process with an optional maximum time limit.
194
195
        Args:
196
            max_time (float, optional): Maximum time limit for the optimization process. Defaults to None.
197
        """
198
199
        self._create_shared_memory()
200
201
        for opt in self.opt_pros.values():
202
            opt.max_time = max_time
203
204
        self.results_list = run_search(
205
            self.opt_pros, self.distribution, self.n_processes
206
        )
207
208
        self.results_ = Results(self.results_list, self.opt_pros)
209
210
        self._print_info()
211
212
    def best_para(self, id_):
213
        """
214
        Retrieve the best parameters for a specific ID from the results.
215
216
        Parameters:
217
        - id_ (int): The ID of the parameters to retrieve.
218
219
        Returns:
220
        - Union[Dict[str, Union[int, float]], None]: The best parameters for the specified ID if found, otherwise None.
221
222
        Raises:
223
        - ValueError: If the objective function name is not recognized.
224
        """
225
226
        return self.results_.best_para(id_)
227
228
    def best_score(self, id_):
229
        """
230
        Return the best score for a specific ID from the results.
231
232
        Parameters:
233
        - id_ (int): The ID for which the best score is requested.
234
        """
235
236
        return self.results_.best_score(id_)
237
238
    def search_data(self, id_, times=False):
239
        """
240
        Retrieve search data for a specific ID from the results. Optionally exclude evaluation and iteration times if 'times' is set to False.
241
242
        Parameters:
243
        - id_ (int): The ID of the search data to retrieve.
244
        - times (bool, optional): Whether to exclude evaluation and iteration times. Defaults to False.
245
246
        Returns:
247
        - pd.DataFrame: The search data for the specified ID.
248
249
        columns are
250
251
            * "score" : float - The score of the search
252
            * "n_columns" : int - The number of columns in the search space
253
            * "metadata" : dict - The metadata returned by the search
254
255
        each row is a search iteration
256
257
        index is RangeIndex
258
        """
259
260
        search_data_ = self.results_.search_data(id_)
261
262
        if times == False:
263
            search_data_.drop(
264
                labels=["eval_times", "iter_times"],
265
                axis=1,
266
                inplace=True,
267
                errors="ignore",
268
            )
269
        return search_data_
270