HMM   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 163
Duplicated Lines 8.59 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 14
loc 163
rs 10
wmc 22

6 Methods

Rating   Name   Duplication   Size   Complexity  
D predict() 14 27 8
A save() 0 9 1
B learn() 0 21 6
A predict_prob() 0 11 3
A decode() 0 48 3
A __init__() 0 7 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import pickle
2
import numpy as np
3
from hmmlearn.hmm import MultinomialHMM
4
5
6
class HMM:
7
    r"""Hidden Markov Model
8
9
    This HMM class implements solution of two problems:
10
    # Supervised learning problem
11
    # Decode Problem
12
13
    Supervised learning:
14
    
15
    Given a sequence of observation O: O1, O2, O3, ... and corresponding state sequence Q: Q1, Q2, Q3, ...
16
    Update probability of state transition matrix A, and observation probability matrix B.
17
    
18
    The value of both A and B are estimated based on the state transition and observation in training data,
19
    so that the joint probability of X (the given observation) and Y (the given state sequence) is maximized.
20
21
    Decode Problem:
22
    
23
    Given state transition matrix A, and observation probability matrix B, and a sequence of observation
24
    O: O1, O2, O3, ... Find the most probable state sequence Q: Q1, Q3, Q3, ...
25
    
26
    In this code, Vertibi algorithm is implemented to solve the decode problem.
27
28
    Args:
29
        num_states (:obj:`int`): Size of state space
30
        num_observations (:obj:`int`): Size of observation space
31
        
32
    Attributes:
33
        num_states (:obj:`int`): Size of state space
34
        num_observations (:obj:`int`): Size of observation space
35
        A (:obj:`numpy.ndarray`): Transition matrix of size (num_states, num_states), where :math:`a_{ij}` is the 
36
            probability of transition from :math:`q_i` to :math:`q_j`
37
        B (:obj:`numpy.ndarray`): Emission matrix of size (num_states, num_observation), where :math:`b_{ij}` is the 
38
            probability of observing :math:`o_j` from :math:`q_i`
39
        total_learned_events (:obj:`int`): Number of events learned so far - used for incremental learning
40
    """
41
    def __init__(self, num_states, num_observations):
42
        self.num_states = num_states
43
        self.num_observations = num_observations
44
        self.A_count = np.ones((num_states, num_states))
45
        self.B_count = np.ones((num_states, num_observations))
46
        self.A = self.A_count / np.sum(self.A_count, axis=1, keepdims=True)
47
        self.B = self.B_count / np.sum(self.B_count, axis=1, keepdims=True)
48
49
    def decode(self, observations, init_probability):
50
        """Vertibi Decode Algorithm
51
52
        Parameters
53
        ----------
54
        observations : np.array
55
            A sequence of observation of size (T, ) O: O_1, O_2, ..., O_T
56
        init_probability : np.array
57
            Initial probability of states represented by an array of size (N, )
58
59
        Vertibi algorithm is composed of three parts: Initialization, Recursion and Termination
60
61
        Temporary Parameters
62
        --------------------
63
        trellis : np.array
64
            trellis stores the best scores so far (or Vertibi path probility)
65
            It is an array of size (N, T), where N is the number of states, and T is the length of observation sequence
66
            trellis_{jt} = max_{q_0, q_1, ..., q_{t-1}} P(q_0, q_1, ..., q_t, o_1, o_2, ..., o_n, q_t=j | \lambda)
67
        back_trace : np.array
68
            back_trace is an array that stores the most possible path that corresponds to the best scores in trellis
69
            It is an array of size (N, T) as well
70
        """
71
        # Allocate temporary arrays
72
        T = observations.shape[0]
73
        trellis = np.zeros((self.num_states, T))
74
        back_trace = np.ones((self.num_states, T)) * -1
75
76
        # Initialization
77
        # trellis_{i0} = \pi_{i} * B{i,O_0}
78
        # In Probability Term:
79
        # P(O_0, q_0=k) = P(O_0 | q_0=k) * P(q_0 = k)
80
        trellis[:, 0] = np.squeeze(init_probability * self.B[:, observations[0]])
81
82
        # Recursion
83
        # If the end state is q_T = k, find the q_{T-1} so that the likelihood to q_T = k is maximized
84
        # And store that maximum likelihood in trellis for future use
85
        # trellis_{k, T} = max_{x} P(O_0, O_1, ..., O_T, q_0, q_1, ..., q_{T-1}=x, q_T=k)
86
        #                = max_{x} P(O_0, O_1, ..., O_{T-1}, q_0, q_1, ..., q_{T-1}=x) * P(q_T=k|q_{T-1}=x) * P(O_T|q_T=k)
87
        #                = max_{x} trellis_{x, T-1} * A_{x,k} * B(k, O_T)
88
        for i in range(1, T):
89
            trellis[:, i] = (trellis[:, i-1, None].dot(self.B[:, observations[i], None].T) * self.A).max(0)
90
            back_trace[:, i] = (np.tile(trellis[:, i-1, None], [1, self.num_states]) * self.A).argmax(0)
91
92
        # Termination - back trace
93
        tokens = [trellis[:, -1].argmax()]
94
        for i in range(T-1, 0, -1):
95
            tokens.append(int(back_trace[tokens[-1], i]))
96
        return tokens[::-1]
97
98
    def learn(self, observations, states):
99
        """Update transition matrix A and emission matrix B with training sequence composed of a sequence of 
100
        observations and corresponding states.
101
        """
102
        # Make sure that states and observations equal each other
103
        if observations.shape[0] == states.shape[0]:
104
            T = observations.shape[0]
105
        else:
106
            return -1
107
        # Update Counts
108
        for i in range(1, T):
109
            if states[i] >= self.num_states or states[i-1] >= self.num_states or \
110
                            observations[i] >= self.num_observations:
111
                return -2
112
            # Update Emission Count (Skip the first one)
113
            self.B_count[states[i], observations[i]] += 1
114
            # Update State Transition
115
            self.A_count[states[i-1], states[i]] += 1
116
        # Update Probability Matrix
117
        self.A = self.A_count / np.sum(self.A_count, axis=1, keepdims=True)
118
        self.B = self.B_count / np.sum(self.B_count, axis=1, keepdims=True)
119
120
    def save(self, filename):
121
        """Pickle the model to file.
122
        
123
        Args:
124
            filename (:obj:`str`): The path of the file to store the model parameters.
125
        """
126
        f = open(filename, 'wb')
127
        pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL)
128
        f.close()
129
130
    def predict(self, x, init_prob=None, method='hmmlearn', window=-1):
131
        """Predict result based on HMM
132
        """
133
        if init_prob is None:
134
            init_prob = np.array([1/self.num_states for i in range(self.num_states)])
135
        if method == 'hmmlearn':
136
            model = MultinomialHMM(self.num_states, n_iter=100)
137
            model.n_features = self.num_observations
138
            model.startprob_ = init_prob
139
            model.emissionprob_ = self.B
140
            model.transmat_ = self.A
141 View Code Duplication
            if window == -1:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
142
                result = model.predict(x)
143
            else:
144
                result = np.zeros(x.shape[0], dtype=np.int)
145
                result[0:window] = model.predict(x[0:window])
146
                for i in range(window, x.shape[0]):
147
                    result[i] = model.predict(x[i-window+1:i+1])[-1]
148
        else:
149 View Code Duplication
            if window == -1:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150
                result = self.decode(x, init_prob)
151
            else:
152
                result = np.zeros(x.shape[0], dtype=np.int)
153
                result[0:window] = self.decode(x[0:window], init_prob)
154
                for i in range(window, x.shape[0]):
155
                    result[i] = self.decode(x[i-window+1:i+1], init_prob)[-1]
156
        return result
157
158
    def predict_prob(self, x, init_prob=None, window=-1):
159
        """Predict the probability
160
        """
161
        if init_prob is None:
162
            init_prob = np.array([1/self.num_states for i in range(self.num_states)])
163
        model = MultinomialHMM(self.num_states)
164
        model.n_features = self.num_observations
165
        model.startprob_ = init_prob
166
        model.emissionprob_ = self.B
167
        model.transmat_ = self.A
168
        return model.predict_proba(x)
169