Completed
Branch master (f50597)
by Wouter
52s
created

ImportanceWeightedClassifier   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 370
Duplicated Lines 17.3 %

Test Coverage

Coverage 31.62%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 64
loc 370
ccs 43
cts 136
cp 0.3162
rs 8.439
wmc 47

10 Methods

Rating   Name   Duplication   Size   Complexity  
B predict() 0 25 4
A is_trained() 0 3 1
B iwe_logistic_discrimination() 0 31 2
B __init__() 64 64 4
B iwe_kernel_mean_matching() 0 49 5
B iwe_kernel_densities() 0 28 6
B iwe_nearest_neighbours() 0 33 4
A get_params() 0 3 1
D fit() 0 50 10
D iwe_ratio_gaussians() 0 67 10

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ImportanceWeightedClassifier often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4 1
import numpy as np
5 1
import scipy.stats as st
6 1
from scipy.spatial.distance import cdist
7 1
import sklearn as sk
8 1
from sklearn.svm import LinearSVC
9 1
from sklearn.linear_model import LogisticRegression, LinearRegression
10 1
from sklearn.model_selection import cross_val_predict
11 1
from os.path import basename
12 1
from cvxopt import matrix, solvers
13
14 1
from .util import is_pos_def
15
16
17 1
class ImportanceWeightedClassifier(object):
18
    """
19
    Class of importance-weighted classifiers.
20
21
    Methods contain different importance-weight estimators and different loss
22
    functions.
23
    """
24
25 1 View Code Duplication
    def __init__(self, loss='logistic', l2=1.0, iwe='lr', smoothing=True,
0 ignored issues
show
Duplication introduced
This code seems to be duplicated in your project.
Loading history...
26
                 clip=-1, kernel_type='rbf', bandwidth=1):
27
        """
28
        Select a particular type of importance-weighted classifier.
29
30
        Parameters
31
        ----------
32
        loss : str
33
            loss function for weighted classifier, options: 'logistic',
34
            'quadratic', 'hinge' (def: 'logistic')
35
        l2 : float
36
            l2-regularization parameter value (def:0.01)
37
        iwe : str
38
            importance weight estimator, options: 'lr', 'nn', 'rg', 'kmm',
39
            'kde' (def: 'lr')
40
        smoothing : bool
41
            whether to apply Laplace smoothing to the nearest-neighbour
42
            importance-weight estimator (def: True)
43
        clip : float
44
            maximum allowable importance-weight value; if set to -1, then the
45
            weights are not clipped (def:-1)
46
        kernel_type : str
47
            what type of kernel to use for kernel density estimation or kernel
48
            mean matching, options: 'diste', 'rbf' (def: 'rbf')
49
        bandwidth : float
50
            kernel bandwidth parameter value for kernel-based weight
51
            estimators (def: 1)
52
53
        Returns
54
        -------
55
        None
56
57
        Examples
58
        --------
59
        >>>> clf = ImportanceWeightedClassifier()
60
61
        """
62 1
        self.loss = loss
63 1
        self.l2 = l2
64 1
        self.iwe = iwe
65 1
        self.smoothing = smoothing
66 1
        self.clip = clip
67 1
        self.kernel_type = kernel_type
68 1
        self.bandwidth = bandwidth
69
70
        # Initialize untrained classifiers based on choice of loss function
71 1
        if self.loss == 'logistic':
72
            # Logistic regression model
73 1
            self.clf = LogisticRegression()
74
        elif self.loss == 'quadratic':
75
            # Least-squares model
76
            self.clf = LinearRegression()
77
        elif self.loss == 'hinge':
78
            # Linear support vector machine
79
            self.clf = LinearSVC()
80
        else:
81
            # Other loss functions are not implemented
82
            raise NotImplementedError('Loss function not implemented.')
83
84
        # Whether model has been trained
85 1
        self.is_trained = False
86
87
        # Dimensionality of training data
88 1
        self.train_data_dim = ''
89
90 1
    def iwe_ratio_gaussians(self, X, Z):
91
        """
92
        Estimate importance weights based on a ratio of Gaussian distributions.
93
94
        Parameters
95
        ----------
96
        X : array
97
            source data (N samples by D features)
98
        Z : array
99
            target data (M samples by D features)
100
101
        Returns
102
        -------
103
        iw : array
104
            importance weights (N samples by 1)
105
106
        Examples
107
        --------
108
        X = np.random.randn(10, 2)
109
        Z = np.random.randn(10, 2)
110
        clf = ImportanceWeightedClassifier()
111
        iw = clf.iwe_ratio_gaussians(X, Z)
112
113
        """
114
        # Data shapes
115 1
        N, DX = X.shape
116 1
        M, DZ = Z.shape
117
118
        # Assert equivalent dimensionalities
119 1
        if not DX == DZ:
120
            raise ValueError('Dimensionalities of X and Z should be equal.')
121
122
        # Sample means in each domain
123 1
        mu_X = np.mean(X, axis=0)
124 1
        mu_Z = np.mean(Z, axis=0)
125
126
        # Sample covariances
127 1
        Si_X = np.cov(X.T)
128 1
        Si_Z = np.cov(Z.T)
129
130
        # Check for positive-definiteness of covariance matrices
131 1
        if not (is_pos_def(Si_X) or is_pos_def(Si_Z)):
132
            print('Warning: covariate matrices not PSD.')
133
134
            regct = -6
135
            while not (is_pos_def(Si_X) or is_pos_def(Si_Z)):
136
                print('Adding regularization: ' + str(1**regct))
137
138
                # Add regularization
139
                Si_X += np.eye(DX)*10.**regct
140
                Si_Z += np.eye(DZ)*10.**regct
141
142
                # Increment regularization counter
143
                regct += 1
144
145
        # Compute probability of X under each domain
146 1
        pT = st.multivariate_normal.pdf(X, mu_Z, Si_Z)
147 1
        pS = st.multivariate_normal.pdf(X, mu_X, Si_X)
148
149
        # Check for numerical problems
150 1
        if np.any(np.isnan(pT)) or np.any(pT == 0):
151
            raise ValueError('Source probabilities are NaN or 0.')
152 1
        if np.any(np.isnan(pS)) or np.any(pS == 0):
153
            raise ValueError('Target probabilities are NaN or 0.')
154
155
        # Return the ratio of probabilities
156 1
        return pT / pS
157
158 1
    def iwe_kernel_densities(self, X, Z):
159
        """
160
        Estimate importance weights based on kernel density estimation.
161
162
        INPUT   (1) array 'X': source data (N samples by D features)
163
                (2) array 'Z': target data (M samples by D features)
164
        OUTPUT  (1) array: importance weights (N samples by 1)
165
        """
166
        # Data shapes
167
        N, DX = X.shape
168
        M, DZ = Z.shape
169
170
        # Assert equivalent dimensionalities
171
        if not DX == DZ:
172
            raise ValueError('Dimensionalities of X and Z should be equal.')
173
174
        # Compute probabilities based on source kernel densities
175
        pT = st.gaussian_kde(Z.T).pdf(X.T)
176
        pS = st.gaussian_kde(X.T).pdf(X.T)
177
178
        # Check for numerical problems
179
        if np.any(np.isnan(pT)) or np.any(pT == 0):
180
            raise ValueError('Source probabilities are NaN or 0.')
181
        if np.any(np.isnan(pS)) or np.any(pS == 0):
182
            raise ValueError('Target probabilities are NaN or 0.')
183
184
        # Return the ratio of probabilities
185
        return pT / pS
186
187 1
    def iwe_logistic_discrimination(self, X, Z):
188
        """
189
        Estimate importance weights based on logistic regression.
190
191
        INPUT   (1) array 'X': source data (N samples by D features)
192
                (2) array 'Z': target data (M samples by D features)
193
        OUTPUT  (1) array: importance weights (N samples by 1)
194
        """
195
        # Data shapes
196 1
        N, DX = X.shape
197 1
        M, DZ = Z.shape
198
199
        # Assert equivalent dimensionalities
200 1
        if not DX == DZ:
201
            raise ValueError('Dimensionalities of X and Z should be equal.')
202
203
        # Make domain-label variable
204 1
        y = np.concatenate((np.zeros((N, 1)),
205
                            np.ones((M, 1))), axis=0)
206
207
        # Concatenate data
208 1
        XZ = np.concatenate((X, Z), axis=0)
209
210
        # Call a logistic regressor
211 1
        lr = LogisticRegression(C=self.l2)
212
213
        # Predict probability of belonging to target using cross-validation
214 1
        preds = cross_val_predict(lr, XZ, y[:, 0])
215
216
        # Return predictions for source samples
217 1
        return preds[:N]
218
219 1
    def iwe_nearest_neighbours(self, X, Z):
220
        """
221
        Estimate importance weights based on nearest-neighbours.
222
223
        INPUT   (1) array 'X': source data (N samples by D features)
224
                (2) array 'Z': target data (M samples by D features)
225
        OUTPUT  (1) array: importance weights (N samples by 1)
226
        """
227
        # Data shapes
228
        N, DX = X.shape
229
        M, DZ = Z.shape
230
231
        # Assert equivalent dimensionalities
232
        if not DX == DZ:
233
            raise ValueError('Dimensionalities of X and Z should be equal.')
234
235
        # Compute Euclidean distance between samples
236
        d = cdist(X, Z, metric='euclidean')
237
238
        # Count target samples within each source Voronoi cell
239
        ix = np.argmin(d, axis=1)
240
        iw, _ = np.array(np.histogram(ix, np.arange(N+1)))
241
242
        # Laplace smoothing
243
        if self.smoothing:
244
            iw = (iw + 1.) / (N + 1)
245
246
        # Weight clipping
247
        if self.clip > 0:
248
            iw = np.minimum(self.clip, np.maximum(0, iw))
249
250
        # Return weights
251
        return iw
252
253 1
    def iwe_kernel_mean_matching(self, X, Z):
254
        """
255
        Estimate importance weights based on kernel mean matching.
256
257
        INPUT   (1) array 'X': source data (N samples by D features)
258
                (2) array 'Z': target data (M samples by D features)
259
        OUTPUT  (1) array: importance weights (N samples by 1)
260
        """
261
        # Data shapes
262
        N, DX = X.shape
263
        M, DZ = Z.shape
264
265
        # Assert equivalent dimensionalities
266
        if not DX == DZ:
267
            raise ValueError('Dimensionalities of X and Z should be equal.')
268
269
        # Compute sample pairwise distances
270
        KXX = cdist(X, X, metric='euclidean')
271
        KXZ = cdist(X, Z, metric='euclidean')
272
273
        # Check non-negative distances
274
        if not np.all(KXX >= 0):
275
            raise ValueError('Non-positive distance in source kernel.')
276
        if not np.all(KXZ >= 0):
277
            raise ValueError('Non-positive distance in source-target kernel.')
278
279
        # Compute kernels
280
        if self.kernel_type == 'rbf':
281
            # Radial basis functions
282
            KXX = np.exp(-KXX / (2*self.bandwidth**2))
283
            KXZ = np.exp(-KXZ / (2*self.bandwidth**2))
284
285
        # Collapse second kernel and normalize
286
        KXZ = N/M * np.sum(KXZ, axis=1)
287
288
        # Prepare for CVXOPT
289
        Q = matrix(KXX, tc='d')
290
        p = matrix(KXZ, tc='d')
291
        G = matrix(np.concatenate((np.ones((1, N)), -1*np.ones((1, N)),
292
                                   -1.*np.eye(N)), axis=0), tc='d')
293
        h = matrix(np.concatenate((np.array([N/np.sqrt(N) + N], ndmin=2),
294
                                   np.array([N/np.sqrt(N) - N], ndmin=2),
295
                                   np.zeros((N, 1))), axis=0), tc='d')
296
297
        # Call quadratic program solver
298
        sol = solvers.qp(Q, p, G, h)
299
300
        # Return optimal coefficients as importance weights
301
        return np.array(sol['x'])[:, 0]
302
303 1
    def fit(self, X, y, Z):
304
        """
305
        Fit/train an importance-weighted classifier.
306
307
        INPUT   (1) array 'X': source data (N samples by D features)
308
                (2) array 'y': source labels (N samples by 1)
309
                (3) array 'Z': target data (M samples by D features)
310
        OUTPUT  None
311
        """
312
        # Data shapes
313
        N, DX = X.shape
314
        M, DZ = Z.shape
315
316
        # Assert equivalent dimensionalities
317
        if not DX == DZ:
318
            raise ValueError('Dimensionalities of X and Z should be equal.')
319
320
        # Find importance-weights
321
        if self.iwe == 'lr':
322
            w = self.iwe_logistic_discrimination(X, Z)
323
        elif self.iwe == 'rg':
324
            w = self.iwe_ratio_gaussians(X, Z)
325
        elif self.iwe == 'nn':
326
            w = self.iwe_nearest_neighbours(X, Z)
327
        elif self.iwe == 'kde':
328
            w = self.iwe_kernel_densities(X, Z)
329
        elif self.iwe == 'kmm':
330
            w = self.iwe_kernel_mean_matching(X, Z)
331
        else:
332
            raise NotImplementedError('Estimator not implemented.')
333
334
        # Train a weighted classifier
335
        if self.loss == 'logistic':
336
            # Logistic regression model with sample weights
337
            self.clf.fit(X, y, w)
338
        elif self.loss == 'quadratic':
339
            # Least-squares model with sample weights
340
            self.clf.fit(X, y, w)
341
        elif self.loss == 'hinge':
342
            # Linear support vector machine with sample weights
343
            self.clf.fit(X, y, w)
344
        else:
345
            # Other loss functions are not implemented
346
            raise NotImplementedError('Loss function not implemented.')
347
348
        # Mark classifier as trained
349
        self.is_trained = True
350
351
        # Store training data dimensionality
352
        self.train_data_dim = DX
353
354 1
    def predict(self, Z_):
355
        """
356
        Make predictions on new dataset.
357
358
        INPUT   (1) array 'Z_': new data set (M samples by D features)
359
        OUTPUT  (2) array 'preds': label predictions (M samples by 1)
360
        """
361
        # Data shape
362
        M, D = Z_.shape
363
364
        # If classifier is trained, check for same dimensionality
365
        if self.is_trained:
366
            if not self.train_data_dim == D:
367
                raise ValueError('''Test data is of different dimensionality 
368
                                 than training data.''')
369
370
        # Call scikit's predict function
371
        preds = self.clf.predict(Z_)
372
373
        # For quadratic loss function, correct predictions
374
        if self.loss == 'quadratic':
375
            preds = (np.sign(preds)+1)/2.
376
377
        # Return predictions array
378
        return preds
379
380 1
    def get_params(self):
381
        """Get classifier parameters."""
382
        return self.clf.get_params()
383
384 1
    def is_trained(self):
385
        """Check whether classifier is trained."""
386
        return self.is_trained
387