_setup_params()   B
last analyzed

Complexity

Conditions 1

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 31
rs 8.8571
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
import os
4
5
import numpy as np
6
from numpy import linalg as LA
7
from theano import tensor as T
8
import theano
9
from theano.tensor.shared_randomstreams import RandomStreams
10
11
from deepy import NeuralClassifier, NetworkConfig
12
from deepy.utils import build_activation, disconnected_grad
13
from deepy.utils.functions import FLOATX
14
from deepy.networks import NeuralLayer
15
from experiments.attention_models.gaussian_sampler import SampleMultivariateGaussian
16
17
18
class AttentionLayer(NeuralLayer):
19
20
    def __init__(self, activation='relu', std=0.1, disable_reinforce=False, random_glimpse=False):
21
        self.disable_reinforce = disable_reinforce
22
        self.random_glimpse = random_glimpse
23
        self.gaussian_std = std
24
        super(AttentionLayer, self).__init__(10, activation)
25
26
    def connect(self, config, vars, x, input_n, id="UNKNOWN"):
27
        self._config = config
28
        self._vars = vars
29
        self.input_n = input_n
30
        self.id = id
31
        self.x = x
32
        self._setup_params()
33
        self._setup_functions()
34
        self.connected = True
35
36
    def _glimpse_sensor(self, x_t, l_p):
37
        """
38
        Parameters:
39
            x_t - 28x28 image
40
            l_p - 2x1 focus vector
41
        Returns:
42
            4x12 matrix
43
        """
44
        # Turn l_p to the left-top point of rectangle
45
        l_p = l_p * 14 + 14 - 2
46
        l_p = T.cast(T.round(l_p), "int32")
47
48
        l_p = l_p * (l_p >= 0)
49
        l_p = l_p * (l_p < 24) + (l_p >= 24) * 23
50
        l_p2 = l_p - 2
51
        l_p2 = l_p2 * (l_p2 >= 0)
52
        l_p2 = l_p2 * (l_p2 < 20) + (l_p2 >= 20) * 19
53
        l_p3 = l_p - 6
54
        l_p3 = l_p3 * (l_p3 >= 0)
55
        l_p3 = l_p3 * (l_p3 < 16) + (l_p3 >= 16) * 15
56
        glimpse_1 = x_t[l_p[0]: l_p[0] + 4][:, l_p[1]: l_p[1] + 4]
57
        glimpse_2 = x_t[l_p2[0]: l_p2[0] + 8][:, l_p2[1]: l_p2[1] + 8]
58
        glimpse_2 = theano.tensor.signal.downsample.max_pool_2d(glimpse_2, (2,2))
59
        glimpse_3 = x_t[l_p3[0]: l_p3[0] + 16][:, l_p3[1]: l_p3[1] + 16]
60
        glimpse_3 = theano.tensor.signal.downsample.max_pool_2d(glimpse_3, (4,4))
61
        return T.concatenate([glimpse_1, glimpse_2, glimpse_3])
62
63
    def _refined_glimpse_sensor(self, x_t, l_p):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
64
        """
65
        Parameters:
66
            x_t - 28x28 image
67
            l_p - 2x1 focus vector
68
        Returns:
69
            7*14 matrix
70
        """
71
        # Turn l_p to the left-top point of rectangle
72
        l_p = l_p * 14 + 14 - 4
73
        l_p = T.cast(T.round(l_p), "int32")
74
75
        l_p = l_p * (l_p >= 0)
76
        l_p = l_p * (l_p < 21) + (l_p >= 21) * 20
77
        glimpse_1 = x_t[l_p[0]: l_p[0] + 7][:, l_p[1]: l_p[1] + 7]
78
        # glimpse_2 = theano.tensor.signal.downsample.max_pool_2d(x_t, (4,4))
79
        # return T.concatenate([glimpse_1, glimpse_2])
80
        return glimpse_1
81
82
    def _multi_gaussian_pdf(self, vec, mean):
83
        norm2d_var = ((1.0 / T.sqrt((2*np.pi)**2 * self.cov_det_var)) *
84
                      T.exp(-0.5 * ((vec-mean).T.dot(self.cov_inv_var).dot(vec-mean))))
85
        return norm2d_var
86
87
    def _glimpse_network(self, x_t, l_p):
88
        """
89
        Parameters:
90
            x_t - 28x28 image
91
            l_p - 2x1 focus vector
92
        Returns:
93
            4x12 matrix
94
        """
95
        sensor_output = self._refined_glimpse_sensor(x_t, l_p)
96
        sensor_output = T.flatten(sensor_output)
97
        h_g = self._relu(T.dot(sensor_output, self.W_g0))
98
        h_l = self._relu(T.dot(l_p, self.W_g1))
99
        g = self._relu(T.dot(h_g, self.W_g2_hg) + T.dot(h_l, self.W_g2_hl))
100
        return g
101
102
    def _location_network(self, h_t):
103
        """
104
        Parameters:
105
            h_t - 256x1 vector
106
        Returns:
107
            2x1 focus vector
108
        """
109
        return T.dot(h_t, self.W_l)
110
111
    def _action_network(self, h_t):
112
        """
113
        Parameters:
114
            h_t - 256x1 vector
115
        Returns:
116
            10x1 vector
117
        """
118
        z = self._relu(T.dot(h_t, self.W_a) + self.B_a)
119
        return self._softmax(z)
120
121
    def _core_network(self, l_p, h_p, x_t):
122
        """
123
        Parameters:
124
            x_t - 28x28 image
125
            l_p - 2x1 focus vector
126
            h_p - 256x1 vector
127
        Returns:
128
            h_t, 256x1 vector
129
        """
130
        g_t = self._glimpse_network(x_t, l_p)
131
        h_t = self._tanh(T.dot(g_t, self.W_h_g) + T.dot(h_p, self.W_h) + self.B_h)
132
        l_t = self._location_network(h_t)
133
134
        if not self.disable_reinforce:
135
            sampled_l_t = self._sample_gaussian(l_t, self.cov)
136
            sampled_pdf = self._multi_gaussian_pdf(disconnected_grad(sampled_l_t), l_t)
137
            wl_grad = T.grad(T.log(sampled_pdf), self.W_l)
138
        else:
139
            sampled_l_t = l_t
140
            wl_grad = self.W_l
141
142
        if self.random_glimpse and self.disable_reinforce:
143
            sampled_l_t = self.srng.uniform((2,)) * 0.8
144
145
        a_t = self._action_network(h_t)
146
147
        return sampled_l_t, h_t, a_t, wl_grad
148
149
150
    def _output_func(self):
151
        self.x = self.x.reshape((28, 28))
152
        [l_ts, h_ts, a_ts, wl_grads], _ = theano.scan(fn=self._core_network,
153
                         outputs_info=[self.l0, self.h0, None, None],
154
                         non_sequences=[self.x],
155
                         n_steps=5)
156
157
        self.positions = l_ts
158
        self.last_decision = T.argmax(a_ts[-1])
159
        wl_grad = T.sum(wl_grads, axis=0) / wl_grads.shape[0]
160
        self.wl_grad = wl_grad
161
        return a_ts[-1].reshape((1,10))
162
163
    def _setup_functions(self):
164
        self._assistive_params = []
165
        self._relu = build_activation("tanh")
166
        self._tanh = build_activation("tanh")
167
        self._softmax = build_activation("softmax")
168
        self.output_func = self._output_func()
169
170
    def _setup_params(self):
171
        self.srng = RandomStreams(seed=234)
172
        self.large_cov = np.array([[0.06,0],[0,0.06]], dtype=FLOATX)
173
        self.small_cov = np.array([[self.gaussian_std,0],[0,self.gaussian_std]], dtype=FLOATX)
174
        self.cov = theano.shared(np.array(self.small_cov, dtype=FLOATX))
175
        self.cov_inv_var = theano.shared(np.array(LA.inv(self.small_cov), dtype=FLOATX))
176
        self.cov_det_var = theano.shared(np.array(LA.det(self.small_cov), dtype=FLOATX))
177
        self._sample_gaussian = SampleMultivariateGaussian()
178
179
        self.W_g0 = self.create_weight(7*7, 128, suffix="g0")
180
        self.W_g1 = self.create_weight(2, 128, suffix="g1")
181
        self.W_g2_hg = self.create_weight(128, 256, suffix="g2_hg")
182
        self.W_g2_hl = self.create_weight(128, 256, suffix="g2_hl")
183
184
        self.W_h_g = self.create_weight(256, 256, suffix="h_g")
185
        self.W_h = self.create_weight(256, 256, suffix="h")
186
        self.B_h = self.create_bias(256, suffix="h")
187
        self.h0 = self.create_vector(256, "h0")
188
        self.l0 = self.create_vector(2, "l0")
189
        self.l0.set_value(np.array([-1, -1], dtype=FLOATX))
190
191
        self.W_l = self.create_weight(256, 2, suffix="l")
192
        self.W_l.set_value(self.W_l.get_value() / 10)
193
        self.B_l = self.create_bias(2, suffix="l")
194
        self.W_a = self.create_weight(256, 10, suffix="a")
195
        self.B_a = self.create_bias(10, suffix="a")
196
197
198
        self.W = [self.W_g0, self.W_g1, self.W_g2_hg, self.W_g2_hl, self.W_h_g, self.W_h, self.W_a]
199
        self.B = [self.B_h, self.B_a]
200
        self.parameters = [self.W_l]
201
202
203
def get_network(model=None, std=0.005, disable_reinforce=False, random_glimpse=False):
204
    """
205
    Get baseline model.
206
    Parameters:
207
        model - model path
208
    Returns:
209
        network
210
    """
211
    network = NeuralClassifier(input_dim=28*28)
212
    network.stack_layer(AttentionLayer(std=std, disable_reinforce=disable_reinforce, random_glimpse=random_glimpse))
213
    if model and os.path.exists(model):
214
        network.load_params(model)
215
    return network
216
217