1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
|
4
|
|
|
import numpy as np |
5
|
|
|
import theano.tensor as T |
6
|
|
|
from recurrent import RecurrentLayer |
7
|
|
|
from deepy.utils import neural_computation, FLOATX |
8
|
|
|
|
9
|
|
|
class PeepholeLSTM(RecurrentLayer): |
10
|
|
|
""" |
11
|
|
|
Long short-term memory layer with peepholes. |
12
|
|
|
""" |
13
|
|
|
|
14
|
|
|
def __init__(self, hidden_size, init_forget_bias=1, **kwargs): |
15
|
|
|
kwargs["hidden_size"] = hidden_size |
16
|
|
|
super(PeepholeLSTM, self).__init__("PLSTM", ["state", "lstm_cell"], **kwargs) |
17
|
|
|
self._init_forget_bias = 1 |
18
|
|
|
|
19
|
|
|
@neural_computation |
20
|
|
|
def compute_new_state(self, step_inputs): |
21
|
|
|
xi_t, xf_t, xo_t, xc_t, h_tm1, c_tm1 = map(step_inputs.get, ["xi", "xf", "xc", "xo", "state", "lstm_cell"]) |
22
|
|
|
if not xi_t: |
23
|
|
|
xi_t, xf_t, xo_t, xc_t = 0, 0, 0, 0 |
24
|
|
|
|
25
|
|
|
# LSTM core step |
26
|
|
|
hs = self.hidden_size |
27
|
|
|
dot_h = T.dot(h_tm1, self.U) |
28
|
|
|
dot_c = T.dot(h_tm1, self.C) |
29
|
|
|
i_t = self.gate_activate(xi_t + dot_h[:, :hs] + self.b_i + dot_c[:, :hs]) |
30
|
|
|
f_t = self.gate_activate(xf_t + dot_h[:, hs:hs*2] + self.b_f + dot_c[:, hs:hs*2]) |
31
|
|
|
c_t = f_t * c_tm1 + i_t * self.activate(xc_t + dot_h[:, hs*2:hs*3] + self.b_c) |
32
|
|
|
o_t = self.gate_activate(xo_t + dot_h[:, hs*3:hs*4] + dot_c[:, hs*2:hs*3] + self.b_o) |
33
|
|
|
h_t = o_t * self.activate(c_t) |
34
|
|
|
|
35
|
|
|
return {"state": h_t, "lstm_cell": c_t} |
36
|
|
|
|
37
|
|
|
@neural_computation |
38
|
|
|
def merge_inputs(self, input_var, additional_inputs=None): |
39
|
|
|
if not additional_inputs: |
40
|
|
|
additional_inputs = [] |
41
|
|
|
all_inputs = filter(bool, [input_var] + additional_inputs) |
42
|
|
|
if not all_inputs: |
43
|
|
|
return {} |
44
|
|
|
last_dim_id = all_inputs[0].ndim - 1 |
45
|
|
|
merged_input = T.concatenate(all_inputs, axis=last_dim_id) |
46
|
|
|
dot_input = T.dot(merged_input, self.W) |
47
|
|
|
merged_inputs = { |
48
|
|
|
"xi": dot_input[:, :, :self.hidden_size], |
49
|
|
|
"xf": dot_input[:, :, self.hidden_size:self.hidden_size*2], |
50
|
|
|
"xc": dot_input[:, :, self.hidden_size*2:self.hidden_size*3], |
51
|
|
|
"xo": dot_input[:, :, self.hidden_size*3:self.hidden_size*4], |
52
|
|
|
} |
53
|
|
|
return merged_inputs |
54
|
|
|
|
55
|
|
|
|
56
|
|
|
def prepare(self): |
57
|
|
|
if self._input_type == "sequence": |
58
|
|
|
all_input_dims = [self.input_dim] + self.additional_input_dims |
59
|
|
|
else: |
60
|
|
|
all_input_dims = self.additional_input_dims |
61
|
|
|
summed_input_dim = sum(all_input_dims, 0) |
62
|
|
|
self.output_dim = self.hidden_size |
63
|
|
|
|
64
|
|
|
self.W = self.create_weight(summed_input_dim, self.hidden_size * 4, "W", initializer=self.outer_init) |
65
|
|
|
self.U = self.create_weight(self.hidden_size, self.hidden_size * 4, "U", initializer=self.inner_init) |
66
|
|
|
self.C = self.create_weight(self.hidden_size, self.hidden_size * 3, "C", initializer=self.inner_init) |
67
|
|
|
|
68
|
|
|
self.b_i = self.create_bias(self.hidden_size, "bi") |
69
|
|
|
self.b_f = self.create_bias(self.hidden_size, "bf") |
70
|
|
|
self.b_f.set_value(np.ones((self.hidden_size,) * self._init_forget_bias, dtype=FLOATX)) |
71
|
|
|
self.b_c = self.create_bias(self.hidden_size, "bc") |
72
|
|
|
self.b_o = self.create_bias(self.hidden_size, "bo") |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
if summed_input_dim > 0: |
76
|
|
|
self.register_parameters(self.W, self.U, self.C, |
77
|
|
|
self.b_i, self.b_f, self.b_c, self.b_o) |
78
|
|
|
else: |
79
|
|
|
self.register_parameters(self.U, self.C, |
80
|
|
|
self.b_i, self.b_f, self.b_c, self.b_o) |