Completed
Push — master ( 39f179...514f8f )
by Tinghui
01:17
created

LSTM.fit()   F

Complexity

Conditions 11

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 11
c 1
b 0
f 0
dl 0
loc 45
rs 3.1764

How to fix   Complexity   

Complexity

Complex classes like LSTM.fit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import numpy as np
3
import tensorflow as tf
4
from .layers import HiddenLayer, SoftmaxLayer
5
from .injectors import BatchSequenceInjector
6
7
logger = logging.getLogger(__name__)
8
9
10
class LSTM:
11
    """Basic Single Layer Long-Short-Term Memory
12
    """
13
    def __init__(self, num_features, num_classes, num_units, num_steps, optimizer=None):
14
        self.num_features = num_features
15
        self.num_classes = num_classes
16
        self.num_steps = num_steps
17
        self.num_units = num_units
18
        self.summaries = []
19
        with tf.name_scope('input'):
20
            self.x = tf.placeholder(tf.float32, shape=[None, num_steps, num_features], name='input_x')
21
            self.init_state = tf.placeholder(tf.float32, shape=[None, 2 * num_units], name='init_state')
22
            self.y_ = tf.placeholder(tf.float32, shape=[None, num_classes], name='input_y')
23
        # Input Hidden Layer - Need to unroll num_steps and apply W/b
24
        hidden_x = tf.reshape(tf.transpose(self.x, [1, 0, 2]), [-1, num_features])
25
        self.hidden_layer = HiddenLayer(num_features, num_units, 'Hidden', x=hidden_x)
26
        # Output of the hidden layer needs to be split to be used with RNN
27
        hidden_y = tf.split(0, num_steps, self.hidden_layer.y)
28
        # Apply RNN
29
        self.cell = tf.nn.rnn_cell.BasicLSTMCell(num_units=num_units, state_is_tuple=False)
30
        outputs, states = tf.nn.rnn(self.cell, hidden_y, initial_state=self.init_state)
31
        self.last_state = states[-1]
32
        # Output Softmax Layer
33
        self.output_layer = SoftmaxLayer(num_units, num_classes, 'SoftmaxLayer', x=outputs[-1])
34
        # Predicted Probability
35
        self.y = self.output_layer.y
36
        self.y_class = tf.argmax(self.y, 1)
37
        # Softmax Cross-Entropy Loss
38
        self.loss = tf.reduce_mean(
39
            tf.nn.softmax_cross_entropy_with_logits(self.output_layer.logits, self.y_,
40
                                                    name='SoftmaxCrossEntropy')
41
        )
42
        # Setup Optimizer
43
        if optimizer is None:
44
            self.optimizer = tf.train.AdamOptimizer()
45
        else:
46
            self.optimizer = optimizer
47
        # Evaluation
48
        self.correct_prediction = tf.equal(self.y_class, tf.argmax(self.y_, 1))
49
        self.accuracy = tf.reduce_mean(tf.cast(self.correct_prediction, tf.float32))
50
        # Fit Step
51
        with tf.name_scope('train'):
52
            self.fit_step = self.optimizer.minimize(self.loss)
53
        # Setup Summaries
54
        self.summaries += self.hidden_layer.summaries
55
        self.summaries += self.output_layer.summaries
56
        self.summaries.append(tf.summary.scalar('cross_entropy', self.loss))
57
        self.summaries.append(tf.summary.scalar('accuracy', self.accuracy))
58
        self.merged = tf.summary.merge(self.summaries)
59
        self.sess = None
60
61
    def fit(self, x, y, batch_size=100, iter_num=100, summaries_dir=None, summary_interval=10,
62
            test_x=None, test_y=None, session=None, criterion=None):
63
        """Fit the model to the dataset
64
        """
65
        if session is None:
66
            if self.sess is None:
67
                session = tf.Session()
68
                self.sess = session
69
            else:
70
                session = self.sess
71
        if summaries_dir is not None:
72
            train_writer = tf.summary.FileWriter(summaries_dir + '/train')
73
            test_writer = tf.summary.FileWriter(summaries_dir + '/test')
74
        session.run(tf.global_variables_initializer())
75
        # Setup batch injector
76
        injector = BatchSequenceInjector(data_x=x, data_y=y, batch_size=batch_size, seq_len=self.num_steps)
77
        # Test sequences
78
        if (test_x is not None) and (test_y is not None):
79
            test_seq_x, test_seq_y = BatchSequenceInjector.to_sequence(
80
                self.num_steps, test_x, test_y, start=0, end=2000
81
            )
82
        train_seq_x, train_seq_y = BatchSequenceInjector.to_sequence(
83
            self.num_steps, x, y, start=0, end=2000
84
        )
85
        for i in range(iter_num):
86
            batch_x, batch_y = injector.next_batch()
87
            if summaries_dir is not None and (i % summary_interval == 0):
88
                summary = session.run(
89
                    self.merged,
90
                    feed_dict={self.x: train_seq_x, self.y_: train_seq_y,
91
                               self.init_state: np.zeros((train_seq_x.shape[0], 2 * self.num_units))}
92
                )
93
                train_writer.add_summary(summary, i)
94
                if (test_x is not None) and (test_y is not None):
95
                    merged, accuracy = session.run(
96
                        [self.merged, self.accuracy],
97
                        feed_dict={self.x: test_seq_x, self.y_: test_seq_y,
98
                                   self.init_state: np.zeros((test_seq_x.shape[0], 2*self.num_units))})
99
                    test_writer.add_summary(merged, i)
100
                    print('test accuracy %g' % accuracy)
101
            loss, accuracy, _ = session.run(
102
                [self.loss, self.accuracy, self.fit_step],
103
                feed_dict={self.x: batch_x, self.y_: batch_y,
104
                           self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
105
            print('Step %d, training accuracy %g, loss %g' % (i, accuracy, loss))
106
107 View Code Duplication
    def predict_proba(self, x, session=None, batch_size=500):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
108
        """Predict probability (Softmax)
109
        """
110
        if session is None:
111
            if self.sess is None:
112
                session = tf.Session()
113
                self.sess = session
114
            else:
115
                session = self.sess
116
        injector = BatchSequenceInjector(batch_size=batch_size, data_x=x, seq_len=self.num_steps)
117
        injector.reset()
118
        result = None
119
        while injector.num_epochs == 0:
120
            batch_x = injector.next_batch()
121
            batch_y = session.run(self.y,
122
                                  feed_dict={self.x: batch_x,
123
                                             self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
124
            if result is None:
125
                result = batch_y
126
            else:
127
                result = np.concatenate((result, batch_y), axis=0)
128
        return result
129
130 View Code Duplication
    def predict(self, x, session=None, batch_size=500):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
        if session is None:
132
            if self.sess is None:
133
                session = tf.Session()
134
                self.sess = session
135
            else:
136
                session = self.sess
137
        injector = BatchSequenceInjector(batch_size=batch_size, data_x=x, seq_len=self.num_steps)
138
        injector.reset()
139
        result = None
140
        while injector.num_epochs == 0:
141
            batch_x = injector.next_batch()
142
            batch_y = session.run(self.y_class,
143
                                  feed_dict={self.x: batch_x,
144
                                             self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
145
            if result is None:
146
                result = batch_y
147
            else:
148
                result = np.concatenate((result, batch_y), axis=0)
149
        return result
150