_cost_func()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 10
rs 9.4286
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import theano.tensor as T
5
from deepy import NeuralLayer, AutoEncoder, Dense
6
from deepy import GaussianInitializer, global_theano_rand
7
8
9
class ReparameterizationLayer(NeuralLayer):
10
    """
11
    Reparameterization layer in a Variational encoder.
12
    Only binary output cost function is supported now.
13
    The prior value is recorded after the computation graph created.
14
    """
15
16
    def __init__(self, size, sample=False):
17
        """
18
        :param size: the size of latent variable
19
        :param sample: whether to get a clean latent variable
20
        """
21
        super(ReparameterizationLayer, self).__init__("VariationalEncoder")
22
        self.size = size
23
        self.output_dim = size
24
        self.sample = sample
25
        self._prior = None
26
27
    def prepare(self):
28
        self._mu_encoder = Dense(self.size, 'linear', init=GaussianInitializer(), random_bias=True).connect(self.input_dim)
29
        self._log_sigma_encoder = Dense(self.size, 'linear', init=GaussianInitializer(), random_bias=True).connect(self.input_dim)
30
        self.register_inner_layers(self._mu_encoder, self._log_sigma_encoder)
31
32
    def output(self, x):
33
        # Compute p(z|x)
34
        mu = self._mu_encoder.output(x)
35
        log_sigma = 0.5 * self._log_sigma_encoder.output(x)
36
        self._prior = 0.5* T.sum(1 + 2*log_sigma - mu**2 - T.exp(2*log_sigma))
37
        # Reparameterization
38
        eps = global_theano_rand.normal((x.shape[0], self.size))
39
40
        if self.sample:
41
            z = mu
42
        else:
43
            z = mu + T.exp(log_sigma) * eps
44
        return z
45
46
    def prior(self):
47
        """
48
        Get the prior value.
49
        """
50
        return self._prior
51
52
53
class VariationalAutoEncoder(AutoEncoder):
54
    """
55
    Variational Auto Encoder.
56
    Only binary output cost function is supported now.
57
    """
58
59
    def __init__(self, input_dim, latent_dim=None, sample=False):
60
        """
61
        :param latent_dim: latent variable size, this is not required in training
62
        :param sample: add no randomness when this flag is on
63
        """
64
        super(VariationalAutoEncoder, self).__init__(input_dim, rep_dim=latent_dim)
65
        self.sample = sample
66
        self._setup_monitors = True
67
68
69
    def stack_reparameterization_layer(self, layer_size):
70
        """
71
        Perform reparameterization trick for latent variables.
72
        :param layer_size: the size of latent variable
73
        """
74
        self.rep_layer = ReparameterizationLayer(layer_size, sample=self.sample)
75
        self.stack_encoders(self.rep_layer)
76
77
    def _cost_func(self, y):
78
        logpxz  = - T.nnet.binary_crossentropy(y, self.input_variables[0]).sum()
79
        logp = logpxz + self.rep_layer.prior()
80
        # the lower bound is the mean value of logp
81
        cost = - logp
82
        if self._setup_monitors:
83
            self._setup_monitors = False
84
            self.training_monitors.append(("lower_bound", logp / y.shape[0]))
85
            self.testing_monitors.append(("lower_bound", logp / y.shape[0]))
86
        return cost
87