Completed
Push — master ( 2384cb...f17ea4 )
by Tinghui
01:12
created

LSTM.predict_accuracy()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
1
import os
2
import logging
3
import numpy as np
4
import tensorflow as tf
5
from .layers import HiddenLayer, SoftmaxLayer
6
from .injectors import BatchSequenceInjector
7
from .criterion import MonitorBased, ConstIterations
8
9
logger = logging.getLogger(__name__)
10
11
12
class LSTM:
13
    """Basic Single Layer Long-Short-Term Memory
14
    """
15
    def __init__(self, num_features, num_classes, num_units, num_steps, optimizer=None):
16
        self.num_features = num_features
17
        self.num_classes = num_classes
18
        self.num_steps = num_steps
19
        self.num_units = num_units
20
        self.summaries = []
21
        with tf.name_scope('input'):
22
            self.x = tf.placeholder(tf.float32, shape=[None, num_steps, num_features], name='input_x')
23
            self.init_state = tf.placeholder(tf.float32, shape=[None, 2 * num_units], name='init_state')
24
            self.y_ = tf.placeholder(tf.float32, shape=[None, num_classes], name='input_y')
25
        # Input Hidden Layer - Need to unroll num_steps and apply W/b
26
        hidden_x = tf.reshape(tf.transpose(self.x, [1, 0, 2]), [-1, num_features])
27
        self.hidden_layer = HiddenLayer(num_features, num_units, 'Hidden', x=hidden_x)
28
        # Output of the hidden layer needs to be split to be used with RNN
29
        hidden_y = tf.split(0, num_steps, self.hidden_layer.y)
30
        # Apply RNN
31
        self.cell = tf.nn.rnn_cell.BasicLSTMCell(num_units=num_units, state_is_tuple=False)
32
        outputs, states = tf.nn.rnn(self.cell, hidden_y, initial_state=self.init_state)
33
        self.last_state = states[-1]
34
        # Output Softmax Layer
35
        self.output_layer = SoftmaxLayer(num_units, num_classes, 'SoftmaxLayer', x=outputs[-1])
36
        # Predicted Probability
37
        self.y = self.output_layer.y
38
        self.y_class = tf.argmax(self.y, 1)
39
        # Softmax Cross-Entropy Loss
40
        self.loss = tf.reduce_mean(
41
            tf.nn.softmax_cross_entropy_with_logits(self.output_layer.logits, self.y_,
42
                                                    name='SoftmaxCrossEntropy')
43
        )
44
        # Setup Optimizer
45
        if optimizer is None:
46
            self.optimizer = tf.train.AdamOptimizer()
47
        else:
48
            self.optimizer = optimizer
49
        # Evaluation
50
        self.correct_prediction = tf.equal(self.y_class, tf.argmax(self.y_, 1))
51
        self.accuracy = tf.reduce_mean(tf.cast(self.correct_prediction, tf.float32))
52
        # Fit Step
53
        with tf.name_scope('train'):
54
            self.fit_step = self.optimizer.minimize(self.loss)
55
        # Setup Summaries
56
        self.summaries += self.hidden_layer.summaries
57
        self.summaries += self.output_layer.summaries
58
        self.summaries.append(tf.summary.scalar('cross_entropy', self.loss))
59
        self.summaries.append(tf.summary.scalar('accuracy', self.accuracy))
60
        self.merged = tf.summary.merge(self.summaries)
61
        self.sess = None
62
63
    def fit(self, x, y, batch_size=100, iter_num=100, summaries_dir=None, summary_interval=10,
64
            test_x=None, test_y=None, session=None, criterion='const_iteration'):
65
        """Fit the model to the dataset
66
67
        Args:
68
            x (:obj:`numpy.ndarray`): Input features of shape (num_samples, num_features).
69
            y (:obj:`numpy.ndarray`): Corresponding Labels of shape (num_samples) for binary classification,
70
                or (num_samples, num_classes) for multi-class classification.
71
            batch_size (:obj:`int`): Batch size used in gradient descent.
72
            iter_num (:obj:`int`): Number of training iterations for const iterations, step depth for monitor based
73
                stopping criterion.
74
            summaries_dir (:obj:`str`): Path of the directory to store summaries and saved values.
75
            summary_interval (:obj:`int`): The step interval to export variable summaries.
76
            test_x (:obj:`numpy.ndarray`): Test feature array used for monitoring training progress.
77
            test_y (:obj:`numpy.ndarray): Test label array used for monitoring training progress.
78
            session (:obj:`tensorflow.Session`): Session to run training functions.
79
            criterion (:obj:`str`): Stopping criteria. 'const_iterations' or 'monitor_based'
80
        """
81
        if session is None:
82
            if self.sess is None:
83
                session = tf.Session()
84
                self.sess = session
85
            else:
86
                session = self.sess
87
        if summaries_dir is not None:
88
            train_writer = tf.summary.FileWriter(summaries_dir + '/train')
89
            test_writer = tf.summary.FileWriter(summaries_dir + '/test')
90
        session.run(tf.global_variables_initializer())
91
        # Setup batch injector
92
        injector = BatchSequenceInjector(data_x=x, data_y=y, batch_size=batch_size, seq_len=self.num_steps)
93
        # Get Stopping Criterion
94
        if criterion == 'const_iteration':
95
            _criterion = ConstIterations(num_iters=iter_num)
96
        elif criterion == 'monitor_based':
97
            num_samples = x.shape[0]
98
            valid_set_len = int(1/5 * num_samples)
99
            valid_x = x[num_samples-valid_set_len:num_samples, :]
100
            valid_y = y[num_samples-valid_set_len:num_samples, :]
101
            x = x[0:num_samples-valid_set_len, :]
102
            y = y[0:num_samples-valid_set_len, :]
103
            _criterion = MonitorBased(n_steps=iter_num,
104
                                      monitor_fn=self.predict_accuracy,
105
                                      monitor_fn_args=(valid_x, valid_y[self.num_steps:, :]),
106
                                      save_fn=tf.train.Saver().save,
107 View Code Duplication
                                      save_fn_args=(session, summaries_dir + '/best.ckpt'))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
108
        else:
109
            logger.error('Wrong criterion %s specified.' % criterion)
110
            return
111
        # Train/Test sequence for brief reporting of accuracy and loss
112
        train_seq_x, train_seq_y = BatchSequenceInjector.to_sequence(
113
            self.num_steps, x, y, start=0, end=2000
114
        )
115
        if (test_x is not None) and (test_y is not None):
116
            test_seq_x, test_seq_y = BatchSequenceInjector.to_sequence(
117
                self.num_steps, test_x, test_y, start=0, end=2000
118
            )
119
        # Iteration Starts
120
        i = 0
121
        while _criterion.continue_learning():
122
            batch_x, batch_y = injector.next_batch()
123
            if summaries_dir is not None and (i % summary_interval == 0):
124
                summary, loss, accuracy = session.run(
125
                    [self.merged, self.loss, self.accuracy],
126
                    feed_dict={self.x: train_seq_x, self.y_: train_seq_y,
127
                               self.init_state: np.zeros((train_seq_x.shape[0], 2 * self.num_units))}
128
                )
129
                train_writer.add_summary(summary, i)
130 View Code Duplication
                logger.info('Step %d, train_set accuracy %g, loss %g' % (i, accuracy, loss))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
                if (test_x is not None) and (test_y is not None):
132
                    merged, accuracy = session.run(
133
                        [self.merged, self.accuracy],
134
                        feed_dict={self.x: test_seq_x, self.y_: test_seq_y,
135
                                   self.init_state: np.zeros((test_seq_x.shape[0], 2*self.num_units))})
136
                    test_writer.add_summary(merged, i)
137
                    logger.info('test_set accuracy %g' % accuracy)
138
            loss, accuracy, _ = session.run(
139
                [self.loss, self.accuracy, self.fit_step],
140
                feed_dict={self.x: batch_x, self.y_: batch_y,
141
                           self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
142
            i += 1
143
        # Finish Iteration
144
        if criterion == 'monitor_based':
145
            tf.train.Saver().restore(session, os.path.join(summaries_dir, 'best.ckpt'))
146
        logger.debug('Total Epoch: %d, current batch %d', injector.num_epochs, injector.cur_batch)
147
148
    def predict_proba(self, x, session=None, batch_size=500):
149
        """Predict probability (Softmax)
150
        """
151
        if session is None:
152
            if self.sess is None:
153
                session = tf.Session()
154
                self.sess = session
155
            else:
156
                session = self.sess
157
        injector = BatchSequenceInjector(batch_size=batch_size, data_x=x, seq_len=self.num_steps)
158
        injector.reset()
159
        result = None
160
        while injector.num_epochs == 0:
161
            batch_x = injector.next_batch()
162
            batch_y = session.run(self.y,
163
                                  feed_dict={self.x: batch_x,
164
                                             self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
165
            if result is None:
166
                result = batch_y
167
            else:
168
                result = np.concatenate((result, batch_y), axis=0)
169
        return result
170
171
    def predict(self, x, session=None, batch_size=500):
172
        if session is None:
173
            if self.sess is None:
174
                session = tf.Session()
175
                self.sess = session
176
            else:
177
                session = self.sess
178
        injector = BatchSequenceInjector(batch_size=batch_size, data_x=x, seq_len=self.num_steps)
179
        injector.reset()
180
        result = None
181
        while injector.num_epochs == 0:
182
            batch_x = injector.next_batch()
183
            batch_y = session.run(self.y_class,
184
                                  feed_dict={self.x: batch_x,
185
                                             self.init_state: np.zeros((batch_x.shape[0], 2 * self.num_units))})
186
            if result is None:
187
                result = batch_y
188
            else:
189
                result = np.concatenate((result, batch_y), axis=0)
190
        return result
191
192
    def predict_accuracy(self, x, y, session=None):
193
        """Get Accuracy given feature array and corresponding labels
194
        """
195
        if session is None:
196
            if self.sess is None:
197
                session = tf.Session()
198
                self.sess = session
199
            else:
200
                session = self.sess
201
        predict = self.predict(x, session=session)
202
        accuracy = np.sum(predict == y.argmax(y.ndim - 1)) / float(y.shape[0])
203
        return accuracy
204