|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- coding: utf-8 -*- |
|
3
|
|
|
|
|
4
|
|
|
|
|
5
|
|
|
import logging, os |
|
6
|
|
|
logging.basicConfig(level=logging.INFO) |
|
7
|
|
|
import theano.tensor as T |
|
8
|
|
|
|
|
9
|
|
|
from deepy.dataset import MnistDataset, MiniBatches |
|
10
|
|
|
from deepy.networks import BasicNetwork |
|
11
|
|
|
from deepy.layers import Dense, Softmax, NeuralLayer, Chain |
|
12
|
|
|
from deepy.trainers import MomentumTrainer, LearningRateAnnealer |
|
13
|
|
|
from deepy.utils import AutoEncoderCost, CrossEntropyCost, ErrorRateCost |
|
14
|
|
|
|
|
15
|
|
|
|
|
16
|
|
|
model_path = os.path.join(os.path.dirname(__file__), "models", "tutorial2.gz") |
|
17
|
|
|
|
|
18
|
|
|
class MyJointTrainingModel(NeuralLayer): |
|
19
|
|
|
""" |
|
20
|
|
|
A customized model that trains an auto-encoder and MLP classifier simultaneously. |
|
21
|
|
|
""" |
|
22
|
|
|
|
|
23
|
|
|
def __init__(self, internal_layer_size=100): |
|
24
|
|
|
super(MyJointTrainingModel, self).__init__("my joint-training model") |
|
25
|
|
|
self.internal_layer_size = internal_layer_size |
|
26
|
|
|
|
|
27
|
|
|
def prepare(self): |
|
28
|
|
|
""" |
|
29
|
|
|
All codes that create parameters should be put into 'setup' function. |
|
30
|
|
|
""" |
|
31
|
|
|
self.output_dim = 10 |
|
32
|
|
|
self.encoder = Chain(self.input_dim).stack(Dense(self.internal_layer_size, 'tanh')) |
|
33
|
|
|
self.decoder = Chain(self.internal_layer_size).stack(Dense(self.input_dim)) |
|
34
|
|
|
self.classifier = Chain(self.internal_layer_size).stack(Dense(50, 'tanh'), |
|
35
|
|
|
Dense(self.output_dim), |
|
36
|
|
|
Softmax()) |
|
37
|
|
|
|
|
38
|
|
|
self.register_inner_layers(self.encoder, self.decoder, self.classifier) |
|
39
|
|
|
|
|
40
|
|
|
self.target_input = T.ivector('target') |
|
41
|
|
|
self.register_external_inputs(self.target_input) |
|
42
|
|
|
|
|
43
|
|
|
def output(self, x): |
|
44
|
|
|
""" |
|
45
|
|
|
Build the computation graph here. |
|
46
|
|
|
""" |
|
47
|
|
|
internal_variable = self.encoder.output(x) |
|
48
|
|
|
|
|
49
|
|
|
decoding_output = self.decoder.output(internal_variable) |
|
50
|
|
|
|
|
51
|
|
|
classification_output = self.classifier.output(internal_variable) |
|
52
|
|
|
|
|
53
|
|
|
auto_encoder_cost = AutoEncoderCost(decoding_output, x).get() |
|
54
|
|
|
|
|
55
|
|
|
classification_cost = CrossEntropyCost(classification_output, self.target_input).get() |
|
56
|
|
|
|
|
57
|
|
|
final_cost = 0.01 * auto_encoder_cost + classification_cost |
|
58
|
|
|
|
|
59
|
|
|
error_rate = ErrorRateCost(classification_output, self.target_input).get() |
|
60
|
|
|
|
|
61
|
|
|
self.register_monitors(("err", error_rate), |
|
62
|
|
|
("encoder_cost", auto_encoder_cost), |
|
63
|
|
|
("classify_cost", classification_cost)) |
|
64
|
|
|
|
|
65
|
|
|
return final_cost |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
if __name__ == '__main__': |
|
69
|
|
|
model = BasicNetwork(input_dim=28*28, model=MyJointTrainingModel()) |
|
70
|
|
|
|
|
71
|
|
|
mnist = MiniBatches(MnistDataset(), batch_size=20) |
|
72
|
|
|
|
|
73
|
|
|
trainer = MomentumTrainer(model, {'weight_l2': 0.0001}) |
|
74
|
|
|
|
|
75
|
|
|
trainer.run(mnist, controllers=[LearningRateAnnealer(trainer)]) |
|
76
|
|
|
|
|
77
|
|
|
model.save_params(model_path) |