Passed
Push — master ( b4d881...21cb52 )
by Simon
02:57
created

_pytorch_optimizer.ComplexModel.forward()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import torch
2
import torch.nn as nn
3
import numpy as np
4
from torch.utils.data import DataLoader, TensorDataset
5
from gradient_free_optimizers import (
6
    HillClimbingOptimizer,
7
    RandomSearchOptimizer,
8
    RepulsingHillClimbingOptimizer,
9
    PowellsMethod,
10
)
11
12
# Define a synthetic dataset
13
# np.random.seed(42)
14
X = np.random.rand(1000, 20)
15
true_weights = np.random.rand(20, 1)
16
y = X @ true_weights + 0.1 * np.random.randn(1000, 1)
17
18
X = torch.Tensor(X)
19
y = torch.Tensor(y)
20
21
# Create a DataLoader
22
dataset = TensorDataset(X, y)
23
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
24
25
26
num_epochs = 10
27
28
29
# Define a more complex neural network
30
class ComplexModel(nn.Module):
31
    def __init__(self):
32
        super(ComplexModel, self).__init__()
33
        self.network = nn.Sequential(
34
            nn.Linear(20, 64),
35
            nn.ReLU(),
36
            nn.Linear(64, 64),
37
            nn.ReLU(),
38
            nn.Linear(64, 1),
39
        )
40
41
    def forward(self, x):
42
        return self.network(x)
43
44
45
# Initialize the model
46
model = ComplexModel()
47
48
# Define a loss function
49
criterion = nn.MSELoss()
50
51
52
# Define the custom optimizer with GFO
53
class GFOOptimizer(torch.optim.Optimizer):
54
    def __init__(self, params, model, dataloader, criterion, lr=1e-3):
55
        self.model = model
56
        self.dataloader = dataloader
57
        self.criterion = criterion
58
        self.lr = lr
59
60
        # Flatten the initial model parameters
61
        self.initial_weights = {}
62
        self.params = []
63
        counter = 0
64
        for param in self.model.parameters():
65
            self.params.extend(param.data.cpu().numpy().flatten())
66
67
            for value in param.data.flatten():
68
                self.initial_weights[f"x{counter}"] = (
69
                    value.item()
70
                )  # Convert tensor value to Python scalar
71
                counter += 1
72
73
        # Define the search space
74
        self.search_space = {
75
            f"x{i}": np.arange(-1.0, 1.0, 0.1, dtype=np.float32)
76
            for i in range(len(self.params))
77
        }
78
79
        # Initialize the GFO optimizer
80
        self.optimizer = HillClimbingOptimizer(
81
            self.search_space, initialize={"warm_start": [self.initial_weights]}
82
        )
83
84
        self.optimizer.init_search(
85
            objective_function=self.objective_function,
86
            n_iter=num_epochs * len(dataloader),
87
            max_time=None,
88
            max_score=None,
89
            early_stopping=None,
90
            memory=True,
91
            memory_warm_start=None,
92
            verbosity=[],
93
        )
94
95
        defaults = dict(lr=lr)
96
        super().__init__(params, defaults)
97
98
    def objective_function(self, opt_params):
99
        opt_params_l = list(opt_params.values())
100
101
        # Set model parameters
102
        start = 0
103
        for param in self.model.parameters():
104
            param_length = param.numel()
105
            param.data = torch.tensor(
106
                opt_params_l[start : start + param_length]
107
            ).view(param.shape)
108
            start += param_length
109
110
        # Compute the loss
111
        total_loss = 0.0
112
        with torch.no_grad():
113
            for batch_X, batch_y in self.dataloader:
114
                outputs = self.model(batch_X)
115
                loss = self.criterion(outputs, batch_y)
116
                total_loss += loss.item()
117
        return total_loss / len(self.dataloader)
118
119
    def step(self, closure=None):
120
        if closure is not None:
121
            closure()
122
123
        # Use GFO to find the best parameters
124
        self.optimizer.search_step()
125
        best_params = self.optimizer.pos_new
126
127
        print("self.optimizer.score_new", self.optimizer.score_new)
128
129
        # Set the best parameters to the model
130
        start = 0
131
        for param in self.model.parameters():
132
            param_length = param.numel()
133
            """
134
            param.data.copy_(
135
                torch.tensor(
136
                    best_params[start : start + param_length],
137
                    dtype=torch.float32,
138
                ).view(param.shape)
139
            )
140
            """
141
            start += param_length
142
143
        self.params = best_params
144
145
146
# Initialize the custom optimizer
147
optimizer = GFOOptimizer(
148
    model.parameters(), model, dataloader, criterion, lr=0.01
149
)
150
# optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
151
152
153
# Training loop
154
for epoch in range(num_epochs):
155
    for batch_X, batch_y in dataloader:
156
        # Zero the gradients
157
        optimizer.zero_grad()
158
159
        # Forward pass
160
        outputs = model(batch_X)
161
        loss = criterion(outputs, batch_y)
162
163
        # Backward pass
164
        loss.backward()
165
166
        # Update the weights
167
        optimizer.step()
168
169
    # Print the loss for every epoch
170
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
171
172
print("Training completed!")
173