|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- coding: utf-8 -*- |
|
3
|
|
|
|
|
4
|
|
|
import numpy as np |
|
5
|
|
|
import theano.tensor as T |
|
6
|
|
|
from recurrent import RecurrentLayer |
|
7
|
|
|
from deepy.utils import neural_computation, FLOATX |
|
8
|
|
|
|
|
9
|
|
|
class PeepholeLSTM(RecurrentLayer): |
|
10
|
|
|
""" |
|
11
|
|
|
Long short-term memory layer with peepholes. |
|
12
|
|
|
""" |
|
13
|
|
|
|
|
14
|
|
|
def __init__(self, hidden_size, init_forget_bias=1, **kwargs): |
|
15
|
|
|
kwargs["hidden_size"] = hidden_size |
|
16
|
|
|
super(PeepholeLSTM, self).__init__("PLSTM", ["state", "lstm_cell"], **kwargs) |
|
17
|
|
|
self._init_forget_bias = 1 |
|
18
|
|
|
|
|
19
|
|
|
@neural_computation |
|
20
|
|
|
def compute_new_state(self, step_inputs): |
|
21
|
|
|
xi_t, xf_t, xo_t, xc_t, h_tm1, c_tm1 = map(step_inputs.get, ["xi", "xf", "xc", "xo", "state", "lstm_cell"]) |
|
22
|
|
|
if not xi_t: |
|
23
|
|
|
xi_t, xf_t, xo_t, xc_t = 0, 0, 0, 0 |
|
24
|
|
|
|
|
25
|
|
|
# LSTM core step |
|
26
|
|
|
hs = self.hidden_size |
|
27
|
|
|
dot_h = T.dot(h_tm1, self.U) |
|
28
|
|
|
dot_c = T.dot(h_tm1, self.C) |
|
29
|
|
|
i_t = self.gate_activate(xi_t + dot_h[:, :hs] + self.b_i + dot_c[:, :hs]) |
|
30
|
|
|
f_t = self.gate_activate(xf_t + dot_h[:, hs:hs*2] + self.b_f + dot_c[:, hs:hs*2]) |
|
31
|
|
|
c_t = f_t * c_tm1 + i_t * self.activate(xc_t + dot_h[:, hs*2:hs*3] + self.b_c) |
|
32
|
|
|
o_t = self.gate_activate(xo_t + dot_h[:, hs*3:hs*4] + dot_c[:, hs*2:hs*3] + self.b_o) |
|
33
|
|
|
h_t = o_t * self.activate(c_t) |
|
34
|
|
|
|
|
35
|
|
|
return {"state": h_t, "lstm_cell": c_t} |
|
36
|
|
|
|
|
37
|
|
|
@neural_computation |
|
38
|
|
|
def merge_inputs(self, input_var, additional_inputs=None): |
|
39
|
|
|
if not additional_inputs: |
|
40
|
|
|
additional_inputs = [] |
|
41
|
|
|
all_inputs = filter(bool, [input_var] + additional_inputs) |
|
42
|
|
|
if not all_inputs: |
|
43
|
|
|
return {} |
|
44
|
|
|
last_dim_id = all_inputs[0].ndim - 1 |
|
45
|
|
|
merged_input = T.concatenate(all_inputs, axis=last_dim_id) |
|
46
|
|
|
dot_input = T.dot(merged_input, self.W) |
|
47
|
|
|
merged_inputs = { |
|
48
|
|
|
"xi": dot_input[:, :, :self.hidden_size], |
|
49
|
|
|
"xf": dot_input[:, :, self.hidden_size:self.hidden_size*2], |
|
50
|
|
|
"xc": dot_input[:, :, self.hidden_size*2:self.hidden_size*3], |
|
51
|
|
|
"xo": dot_input[:, :, self.hidden_size*3:self.hidden_size*4], |
|
52
|
|
|
} |
|
53
|
|
|
return merged_inputs |
|
54
|
|
|
|
|
55
|
|
|
|
|
56
|
|
|
def prepare(self): |
|
57
|
|
|
if self._input_type == "sequence": |
|
58
|
|
|
all_input_dims = [self.input_dim] + self.additional_input_dims |
|
59
|
|
|
else: |
|
60
|
|
|
all_input_dims = self.additional_input_dims |
|
61
|
|
|
summed_input_dim = sum(all_input_dims, 0) |
|
62
|
|
|
self.output_dim = self.hidden_size |
|
63
|
|
|
|
|
64
|
|
|
self.W = self.create_weight(summed_input_dim, self.hidden_size * 4, "W", initializer=self.outer_init) |
|
65
|
|
|
self.U = self.create_weight(self.hidden_size, self.hidden_size * 4, "U", initializer=self.inner_init) |
|
66
|
|
|
self.C = self.create_weight(self.hidden_size, self.hidden_size * 3, "C", initializer=self.inner_init) |
|
67
|
|
|
|
|
68
|
|
|
self.b_i = self.create_bias(self.hidden_size, "bi") |
|
69
|
|
|
self.b_f = self.create_bias(self.hidden_size, "bf") |
|
70
|
|
|
self.b_f.set_value(np.ones((self.hidden_size,) * self._init_forget_bias, dtype=FLOATX)) |
|
71
|
|
|
self.b_c = self.create_bias(self.hidden_size, "bc") |
|
72
|
|
|
self.b_o = self.create_bias(self.hidden_size, "bo") |
|
73
|
|
|
|
|
74
|
|
|
|
|
75
|
|
|
if summed_input_dim > 0: |
|
76
|
|
|
self.register_parameters(self.W, self.U, self.C, |
|
77
|
|
|
self.b_i, self.b_f, self.b_c, self.b_o) |
|
78
|
|
|
else: |
|
79
|
|
|
self.register_parameters(self.U, self.C, |
|
80
|
|
|
self.b_i, self.b_f, self.b_c, self.b_o) |