Completed
Branch master (f50597)
by Wouter
52s
created

ImportanceWeightedClassifier.iwe_ratio_gaussians()   D

Complexity

Conditions 10

Size

Total Lines 67

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 17.2354

Importance

Changes 0
Metric Value
cc 10
c 0
b 0
f 0
dl 0
loc 67
ccs 14
cts 24
cp 0.5833
crap 17.2354
rs 4.3902

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ImportanceWeightedClassifier.iwe_ratio_gaussians() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4 1
import numpy as np
5 1
import scipy.stats as st
6 1
from scipy.spatial.distance import cdist
7 1
import sklearn as sk
8 1
from sklearn.svm import LinearSVC
9 1
from sklearn.linear_model import LogisticRegression, LinearRegression
10 1
from sklearn.model_selection import cross_val_predict
11 1
from os.path import basename
12 1
from cvxopt import matrix, solvers
13
14 1
from .util import is_pos_def
15
16
17 1
class ImportanceWeightedClassifier(object):
18
    """
19
    Class of importance-weighted classifiers.
20
21
    Methods contain different importance-weight estimators and different loss
22
    functions.
23
    """
24
25 1 View Code Duplication
    def __init__(self, loss='logistic', l2=1.0, iwe='lr', smoothing=True,
0 ignored issues
show
Duplication introduced
This code seems to be duplicated in your project.
Loading history...
26
                 clip=-1, kernel_type='rbf', bandwidth=1):
27
        """
28
        Select a particular type of importance-weighted classifier.
29
30
        Parameters
31
        ----------
32
        loss : str
33
            loss function for weighted classifier, options: 'logistic',
34
            'quadratic', 'hinge' (def: 'logistic')
35
        l2 : float
36
            l2-regularization parameter value (def:0.01)
37
        iwe : str
38
            importance weight estimator, options: 'lr', 'nn', 'rg', 'kmm',
39
            'kde' (def: 'lr')
40
        smoothing : bool
41
            whether to apply Laplace smoothing to the nearest-neighbour
42
            importance-weight estimator (def: True)
43
        clip : float
44
            maximum allowable importance-weight value; if set to -1, then the
45
            weights are not clipped (def:-1)
46
        kernel_type : str
47
            what type of kernel to use for kernel density estimation or kernel
48
            mean matching, options: 'diste', 'rbf' (def: 'rbf')
49
        bandwidth : float
50
            kernel bandwidth parameter value for kernel-based weight
51
            estimators (def: 1)
52
53
        Returns
54
        -------
55
        None
56
57
        Examples
58
        --------
59
        >>>> clf = ImportanceWeightedClassifier()
60
61
        """
62 1
        self.loss = loss
63 1
        self.l2 = l2
64 1
        self.iwe = iwe
65 1
        self.smoothing = smoothing
66 1
        self.clip = clip
67 1
        self.kernel_type = kernel_type
68 1
        self.bandwidth = bandwidth
69
70
        # Initialize untrained classifiers based on choice of loss function
71 1
        if self.loss == 'logistic':
72
            # Logistic regression model
73 1
            self.clf = LogisticRegression()
74
        elif self.loss == 'quadratic':
75
            # Least-squares model
76
            self.clf = LinearRegression()
77
        elif self.loss == 'hinge':
78
            # Linear support vector machine
79
            self.clf = LinearSVC()
80
        else:
81
            # Other loss functions are not implemented
82
            raise NotImplementedError('Loss function not implemented.')
83
84
        # Whether model has been trained
85 1
        self.is_trained = False
86
87
        # Dimensionality of training data
88 1
        self.train_data_dim = ''
89
90 1
    def iwe_ratio_gaussians(self, X, Z):
91
        """
92
        Estimate importance weights based on a ratio of Gaussian distributions.
93
94
        Parameters
95
        ----------
96
        X : array
97
            source data (N samples by D features)
98
        Z : array
99
            target data (M samples by D features)
100
101
        Returns
102
        -------
103
        iw : array
104
            importance weights (N samples by 1)
105
106
        Examples
107
        --------
108
        X = np.random.randn(10, 2)
109
        Z = np.random.randn(10, 2)
110
        clf = ImportanceWeightedClassifier()
111
        iw = clf.iwe_ratio_gaussians(X, Z)
112
113
        """
114
        # Data shapes
115 1
        N, DX = X.shape
116 1
        M, DZ = Z.shape
117
118
        # Assert equivalent dimensionalities
119 1
        if not DX == DZ:
120
            raise ValueError('Dimensionalities of X and Z should be equal.')
121
122
        # Sample means in each domain
123 1
        mu_X = np.mean(X, axis=0)
124 1
        mu_Z = np.mean(Z, axis=0)
125
126
        # Sample covariances
127 1
        Si_X = np.cov(X.T)
128 1
        Si_Z = np.cov(Z.T)
129
130
        # Check for positive-definiteness of covariance matrices
131 1
        if not (is_pos_def(Si_X) or is_pos_def(Si_Z)):
132
            print('Warning: covariate matrices not PSD.')
133
134
            regct = -6
135
            while not (is_pos_def(Si_X) or is_pos_def(Si_Z)):
136
                print('Adding regularization: ' + str(1**regct))
137
138
                # Add regularization
139
                Si_X += np.eye(DX)*10.**regct
140
                Si_Z += np.eye(DZ)*10.**regct
141
142
                # Increment regularization counter
143
                regct += 1
144
145
        # Compute probability of X under each domain
146 1
        pT = st.multivariate_normal.pdf(X, mu_Z, Si_Z)
147 1
        pS = st.multivariate_normal.pdf(X, mu_X, Si_X)
148
149
        # Check for numerical problems
150 1
        if np.any(np.isnan(pT)) or np.any(pT == 0):
151
            raise ValueError('Source probabilities are NaN or 0.')
152 1
        if np.any(np.isnan(pS)) or np.any(pS == 0):
153
            raise ValueError('Target probabilities are NaN or 0.')
154
155
        # Return the ratio of probabilities
156 1
        return pT / pS
157
158 1
    def iwe_kernel_densities(self, X, Z):
159
        """
160
        Estimate importance weights based on kernel density estimation.
161
162
        INPUT   (1) array 'X': source data (N samples by D features)
163
                (2) array 'Z': target data (M samples by D features)
164
        OUTPUT  (1) array: importance weights (N samples by 1)
165
        """
166
        # Data shapes
167
        N, DX = X.shape
168
        M, DZ = Z.shape
169
170
        # Assert equivalent dimensionalities
171
        if not DX == DZ:
172
            raise ValueError('Dimensionalities of X and Z should be equal.')
173
174
        # Compute probabilities based on source kernel densities
175
        pT = st.gaussian_kde(Z.T).pdf(X.T)
176
        pS = st.gaussian_kde(X.T).pdf(X.T)
177
178
        # Check for numerical problems
179
        if np.any(np.isnan(pT)) or np.any(pT == 0):
180
            raise ValueError('Source probabilities are NaN or 0.')
181
        if np.any(np.isnan(pS)) or np.any(pS == 0):
182
            raise ValueError('Target probabilities are NaN or 0.')
183
184
        # Return the ratio of probabilities
185
        return pT / pS
186
187 1
    def iwe_logistic_discrimination(self, X, Z):
188
        """
189
        Estimate importance weights based on logistic regression.
190
191
        INPUT   (1) array 'X': source data (N samples by D features)
192
                (2) array 'Z': target data (M samples by D features)
193
        OUTPUT  (1) array: importance weights (N samples by 1)
194
        """
195
        # Data shapes
196 1
        N, DX = X.shape
197 1
        M, DZ = Z.shape
198
199
        # Assert equivalent dimensionalities
200 1
        if not DX == DZ:
201
            raise ValueError('Dimensionalities of X and Z should be equal.')
202
203
        # Make domain-label variable
204 1
        y = np.concatenate((np.zeros((N, 1)),
205
                            np.ones((M, 1))), axis=0)
206
207
        # Concatenate data
208 1
        XZ = np.concatenate((X, Z), axis=0)
209
210
        # Call a logistic regressor
211 1
        lr = LogisticRegression(C=self.l2)
212
213
        # Predict probability of belonging to target using cross-validation
214 1
        preds = cross_val_predict(lr, XZ, y[:, 0])
215
216
        # Return predictions for source samples
217 1
        return preds[:N]
218
219 1
    def iwe_nearest_neighbours(self, X, Z):
220
        """
221
        Estimate importance weights based on nearest-neighbours.
222
223
        INPUT   (1) array 'X': source data (N samples by D features)
224
                (2) array 'Z': target data (M samples by D features)
225
        OUTPUT  (1) array: importance weights (N samples by 1)
226
        """
227
        # Data shapes
228
        N, DX = X.shape
229
        M, DZ = Z.shape
230
231
        # Assert equivalent dimensionalities
232
        if not DX == DZ:
233
            raise ValueError('Dimensionalities of X and Z should be equal.')
234
235
        # Compute Euclidean distance between samples
236
        d = cdist(X, Z, metric='euclidean')
237
238
        # Count target samples within each source Voronoi cell
239
        ix = np.argmin(d, axis=1)
240
        iw, _ = np.array(np.histogram(ix, np.arange(N+1)))
241
242
        # Laplace smoothing
243
        if self.smoothing:
244
            iw = (iw + 1.) / (N + 1)
245
246
        # Weight clipping
247
        if self.clip > 0:
248
            iw = np.minimum(self.clip, np.maximum(0, iw))
249
250
        # Return weights
251
        return iw
252
253 1
    def iwe_kernel_mean_matching(self, X, Z):
254
        """
255
        Estimate importance weights based on kernel mean matching.
256
257
        INPUT   (1) array 'X': source data (N samples by D features)
258
                (2) array 'Z': target data (M samples by D features)
259
        OUTPUT  (1) array: importance weights (N samples by 1)
260
        """
261
        # Data shapes
262
        N, DX = X.shape
263
        M, DZ = Z.shape
264
265
        # Assert equivalent dimensionalities
266
        if not DX == DZ:
267
            raise ValueError('Dimensionalities of X and Z should be equal.')
268
269
        # Compute sample pairwise distances
270
        KXX = cdist(X, X, metric='euclidean')
271
        KXZ = cdist(X, Z, metric='euclidean')
272
273
        # Check non-negative distances
274
        if not np.all(KXX >= 0):
275
            raise ValueError('Non-positive distance in source kernel.')
276
        if not np.all(KXZ >= 0):
277
            raise ValueError('Non-positive distance in source-target kernel.')
278
279
        # Compute kernels
280
        if self.kernel_type == 'rbf':
281
            # Radial basis functions
282
            KXX = np.exp(-KXX / (2*self.bandwidth**2))
283
            KXZ = np.exp(-KXZ / (2*self.bandwidth**2))
284
285
        # Collapse second kernel and normalize
286
        KXZ = N/M * np.sum(KXZ, axis=1)
287
288
        # Prepare for CVXOPT
289
        Q = matrix(KXX, tc='d')
290
        p = matrix(KXZ, tc='d')
291
        G = matrix(np.concatenate((np.ones((1, N)), -1*np.ones((1, N)),
292
                                   -1.*np.eye(N)), axis=0), tc='d')
293
        h = matrix(np.concatenate((np.array([N/np.sqrt(N) + N], ndmin=2),
294
                                   np.array([N/np.sqrt(N) - N], ndmin=2),
295
                                   np.zeros((N, 1))), axis=0), tc='d')
296
297
        # Call quadratic program solver
298
        sol = solvers.qp(Q, p, G, h)
299
300
        # Return optimal coefficients as importance weights
301
        return np.array(sol['x'])[:, 0]
302
303 1
    def fit(self, X, y, Z):
304
        """
305
        Fit/train an importance-weighted classifier.
306
307
        INPUT   (1) array 'X': source data (N samples by D features)
308
                (2) array 'y': source labels (N samples by 1)
309
                (3) array 'Z': target data (M samples by D features)
310
        OUTPUT  None
311
        """
312
        # Data shapes
313
        N, DX = X.shape
314
        M, DZ = Z.shape
315
316
        # Assert equivalent dimensionalities
317
        if not DX == DZ:
318
            raise ValueError('Dimensionalities of X and Z should be equal.')
319
320
        # Find importance-weights
321
        if self.iwe == 'lr':
322
            w = self.iwe_logistic_discrimination(X, Z)
323
        elif self.iwe == 'rg':
324
            w = self.iwe_ratio_gaussians(X, Z)
325
        elif self.iwe == 'nn':
326
            w = self.iwe_nearest_neighbours(X, Z)
327
        elif self.iwe == 'kde':
328
            w = self.iwe_kernel_densities(X, Z)
329
        elif self.iwe == 'kmm':
330
            w = self.iwe_kernel_mean_matching(X, Z)
331
        else:
332
            raise NotImplementedError('Estimator not implemented.')
333
334
        # Train a weighted classifier
335
        if self.loss == 'logistic':
336
            # Logistic regression model with sample weights
337
            self.clf.fit(X, y, w)
338
        elif self.loss == 'quadratic':
339
            # Least-squares model with sample weights
340
            self.clf.fit(X, y, w)
341
        elif self.loss == 'hinge':
342
            # Linear support vector machine with sample weights
343
            self.clf.fit(X, y, w)
344
        else:
345
            # Other loss functions are not implemented
346
            raise NotImplementedError('Loss function not implemented.')
347
348
        # Mark classifier as trained
349
        self.is_trained = True
350
351
        # Store training data dimensionality
352
        self.train_data_dim = DX
353
354 1
    def predict(self, Z_):
355
        """
356
        Make predictions on new dataset.
357
358
        INPUT   (1) array 'Z_': new data set (M samples by D features)
359
        OUTPUT  (2) array 'preds': label predictions (M samples by 1)
360
        """
361
        # Data shape
362
        M, D = Z_.shape
363
364
        # If classifier is trained, check for same dimensionality
365
        if self.is_trained:
366
            if not self.train_data_dim == D:
367
                raise ValueError('''Test data is of different dimensionality 
368
                                 than training data.''')
369
370
        # Call scikit's predict function
371
        preds = self.clf.predict(Z_)
372
373
        # For quadratic loss function, correct predictions
374
        if self.loss == 'quadratic':
375
            preds = (np.sign(preds)+1)/2.
376
377
        # Return predictions array
378
        return preds
379
380 1
    def get_params(self):
381
        """Get classifier parameters."""
382
        return self.clf.get_params()
383
384 1
    def is_trained(self):
385
        """Check whether classifier is trained."""
386
        return self.is_trained
387