Completed
Push — master ( f73e69...91b7c0 )
by Raphael
01:35
created

experiments.variational_autoencoder.VariationalAutoEncoder   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 32
Duplicated Lines 0 %
Metric Value
dl 0
loc 32
rs 10
wmc 4

3 Methods

Rating   Name   Duplication   Size   Complexity  
A VariationalAutoEncoder.__init__() 0 6 1
A VariationalAutoEncoder.stack_reparameterization_layer() 0 7 1
A VariationalAutoEncoder._cost_func() 0 10 2
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import theano.tensor as T
5
from deepy import NeuralLayer, AutoEncoder, Dense
6
from deepy import GaussianInitializer, global_theano_rand
7
8
9
class ReparameterizationLayer(NeuralLayer):
10
    """
11
    Reparameterization layer in a Variational encoder.
12
    Only binary output cost function is supported now.
13
    The prior value is recorded after the computation graph created.
14
    """
15
16
    def __init__(self, size, sample=False):
17
        """
18
        :param size: the size of latent variable
19
        :param sample: whether to get a clean latent variable
20
        """
21
        super(ReparameterizationLayer, self).__init__("VariationalEncoder")
22
        self.size = size
23
        self.output_dim = size
24
        self.sample = sample
25
        self._prior = None
26
27
    def prepare(self):
28
        self._mu_encoder = Dense(self.size, 'linear', init=GaussianInitializer(), random_bias=True).initialize(
29
            self.input_dim)
30
        self._log_sigma_encoder = Dense(self.size, 'linear', init=GaussianInitializer(), random_bias=True).initialize(
31
            self.input_dim)
32
        self.register_inner_layers(self._mu_encoder, self._log_sigma_encoder)
33
34
    def compute_tensor(self, x):
35
        # Compute p(z|x)
36
        mu = self._mu_encoder.compute_tensor(x)
37
        log_sigma = 0.5 * self._log_sigma_encoder.compute_tensor(x)
38
        self._prior = 0.5* T.sum(1 + 2*log_sigma - mu**2 - T.exp(2*log_sigma))
39
        # Reparameterization
40
        eps = global_theano_rand.normal((x.shape[0], self.size))
41
42
        if self.sample:
43
            z = mu
44
        else:
45
            z = mu + T.exp(log_sigma) * eps
46
        return z
47
48
    def prior(self):
49
        """
50
        Get the prior value.
51
        """
52
        return self._prior
53
54
55
class VariationalAutoEncoder(AutoEncoder):
56
    """
57
    Variational Auto Encoder.
58
    Only binary output cost function is supported now.
59
    """
60
61
    def __init__(self, input_dim, input_tensor=None):
62
        """
63
        """
64
        super(VariationalAutoEncoder, self).__init__(input_dim)
65
        self.sample = sample
66
        self._setup_monitors = True
67
68
69
    def stack_reparameterization_layer(self, layer_size):
70
        """
71
        Perform reparameterization trick for latent variables.
72
        :param layer_size: the size of latent variable
73
        """
74
        self.rep_layer = ReparameterizationLayer(layer_size, sample=self.sample)
75
        self.stack_encoders(self.rep_layer)
76
77
    def _cost_func(self, y):
78
        logpxz  = - T.nnet.binary_crossentropy(y, self.input_variables[0]).sum()
79
        logp = logpxz + self.rep_layer.prior()
80
        # the lower bound is the mean value of logp
81
        cost = - logp
82
        if self._setup_monitors:
83
            self._setup_monitors = False
84
            self.training_monitors.append(("lower_bound", logp / y.shape[0]))
85
            self.testing_monitors.append(("lower_bound", logp / y.shape[0]))
86
        return cost
87