_setup_params()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 21
rs 7.8868
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
5
import logging as loggers
6
7
import theano
8
import theano.tensor as T
9
10
from deepy.utils import build_activation
11
from deepy.trainers import THEANO_LINKER
12
from deepy.layers.layer import NeuralLayer
13
14
logging = loggers.getLogger(__name__)
15
16
from network import NeuralNetwork
17
18
# TODO: repair additional_h mode
19
class RecursiveAutoEncoder(NeuralNetwork):
20
    """
21
    Recursive auto encoder (Recursively encode a sequence by combining two children).
22
    Parameters:
23
        rep_dim - dimension of representation
24
    """
25
    def __init__(self, input_dim, rep_dim=None, activation='tanh', unfolding=True, additional_h=False,
26
                 config=None):
27
        super(RecursiveAutoEncoder, self).__init__(input_dim, config=config, input_tensor=3)
28
29
        self.rep_dim = rep_dim
30
        self.stack(RecursiveAutoEncoderCore(rep_dim, unfolding=unfolding, additional_h=additional_h))
31
        self._encode_func = None
32
        self._decode_func = None
33
34
    def _cost_func(self, y):
35
        # As the core returns cost
36
        return y
37
38
    @property
39
    def cost(self):
40
        return self._cost_func(self.output)
41
42
    @property
43
    def test_cost(self):
44
        return self._cost_func(self.test_output)
45
46
    def encode(self, x):
47
        """
48
        Encode given input.
49
        """
50
        if not self._encode_func:
51
            x_var = T.vector()
52
            self._encode_func = theano.function([x_var], self.layers[0].encode_func(x),
53
                            allow_input_downcast=True, mode=theano.Mode(linker=THEANO_LINKER))
54
        return self._encode_func(x)
55
56
    def decode(self, rep, n_steps):
57
        """
58
        Decode given representation.
59
        """
60
        if not self._decode_func:
61
            rep_var = T.vector()
62
            n_var = T.iscalar()
63
            self._decode_func = theano.function([rep_var, n_var], self.layers[0].decode_func(rep_var, n_var),
64
                            allow_input_downcast=True, mode=theano.Mode(linker=THEANO_LINKER))
65
        return self._decode_func(rep, n_steps)
66
67
class RecursiveAutoEncoderCore(NeuralLayer):
68
69
    def __init__(self, rep_dim=None, activation='tanh', unfolding=True, additional_h=True):
70
        """
71
        Binarized Recursive Encoder Core layer
72
        Input:
73
        A sequence of terminal nodes in vectore representations.
74
        Output:
75
        Cost
76
        """
77
        super(RecursiveAutoEncoderCore, self).__init__("RAE")
78
        self.rep_dim = rep_dim
79
        self.unfolding = unfolding
80
        self.additional_h = additional_h
81
        self.activation = activation
82
83
    def prepare(self):
84
        self._setup_params()
85
        self._setup_functions()
86
87
    def output(self, x):
88
        rep, cost = self._recursive_func(x)
89
        self.register_monitors(("mean(rep)", abs(rep).mean()))
90
        return cost
91
92
    def _recursive_step(self, i, p, x):
93
        x_t = x[i]
94
        # Encoding
95
        rep = self._activation_func(T.dot(p, self.W_e1) + T.dot(x_t, self.W_e2) + self.B_e)
96
        if self.unfolding:
97
            x_decs = self._unfold(rep, i)
98
            distance = T.sum((x_decs - x[: i + 1]) ** 2)
99
        else:
100
            # Decoding
101
            p_dec, x_dec = self._decode_step(rep)
102
            # Euclidean distance
103
            distance = T.sum((p_dec - p)**2 + (x_dec - x_t)**2)
104
        return rep, distance
105
106
    def _unfold(self, p, n):
107
        if self.additional_h:
108
            n += 1
109
        [ps, xs], _ = theano.scan(self._decode_step, outputs_info=[p, None], n_steps=n)
110
        if self.additional_h:
111
            return xs[::-1]
112
        else:
113
            return T.concatenate([xs, [ps[-1]]])[::-1]
114
115
    def _recursive_func(self, x):
116
        # Return total error
117
        if self.additional_h:
118
            h0 = self.h0
119
            start_index = 0
120
        else:
121
            h0 = x[0]
122
            start_index = 1
123
        [reps, distances], _ = theano.scan(self._recursive_step, sequences=[T.arange(start_index, x.shape[0])],
124
                                           outputs_info=[h0, None], non_sequences=[x])
125
        return reps[-1], T.sum(distances)
126
127
    def encode_func(self, x):
128
        if self.additional_h:
129
            h0 = self.h0
130
            start_index = 0
131
        else:
132
            h0 = x[0]
133
            start_index = 1
134
        [reps, _], _ = theano.scan(self._recursive_step, sequences=[T.arange(start_index, x.shape[0])],
135
                                           outputs_info=[h0, None], non_sequences=[x])
136
        return reps[-1]
137
138
    def _decode_step(self, p):
139
        p_dec = self._activation_func(T.dot(p, self.W_d1) + self.B_d1)
140
        x_dec = self._activation_func(T.dot(p, self.W_d2) + self.B_d2)
141
        return  p_dec, x_dec
142
143
    def decode_func(self, rep, n):
144
        return self._unfold(rep, n)
145
146
    def _setup_functions(self):
147
        self._assistive_params = []
148
        self._activation_func = build_activation(self.activation)
149
        self._softmax_func = build_activation('softmax')
150
151
    def _setup_params(self):
152
        if not self.rep_dim or self.rep_dim < 0:
153
            self.rep_dim = self.input_dim
154
        if not self.additional_h and self.rep_dim != self.input_dim:
155
            raise Exception("rep_dim must be input_dim when additional_h is not used")
156
157
        self.W_e1 = self.create_weight(self.rep_dim, self.rep_dim, "enc1")
158
        self.W_e2 = self.create_weight(self.input_dim, self.rep_dim, "enc2")
159
        self.B_e = self.create_bias(self.rep_dim, "enc")
160
161
        self.W_d1 = self.create_weight(self.rep_dim, self.rep_dim, "dec1")
162
        self.W_d2 = self.create_weight(self.rep_dim, self.input_dim, "dec2")
163
        self.B_d1 = self.create_bias(self.rep_dim, "dec1")
164
        self.B_d2 = self.create_bias(self.input_dim, "dec2")
165
166
        self.h0 = None
167
        if self.additional_h:
168
            self.h0 = self.create_vector(self.output_dim, "h0")
169
170
        self.register_parameters(self.W_e1, self.W_e2, self.W_d1, self.W_d2,
171
                                 self.B_e, self.B_d1, self.B_d2)
172
173