Passed
Push — master ( 189cd4...596988 )
by Shlomi
01:52
created

BiasWordsEmbedding._identify_direction()   C

Complexity

Conditions 9

Size

Total Lines 53
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 38
nop 5
dl 0
loc 53
rs 6.6346
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import copy
2
import os
3
import warnings
4
5
import matplotlib.pylab as plt
6
import numpy as np
7
import pandas as pd
8
import seaborn as sns
9
from gensim.models.keyedvectors import KeyedVectors
10
from pkg_resources import resource_filename
11
from sklearn.decomposition import PCA
12
from sklearn.svm import LinearSVC
13
from tqdm import tqdm
14
15
from tabulate import tabulate
16
17
from ..consts import RANDOM_STATE
18
from .utils import (
19
    cosine_similarity, normalize, project_reject_vector, project_vector,
20
    reject_vector, update_word_vector,
21
)
22
23
24
DIRECTION_METHODS = ['single', 'sum', 'pca']
25
DEBIAS_METHODS = ['neutralize', 'hard', 'soft']
26
FIRST_PC_THRESHOLD = 0.5
27
MAX_NON_SPECIFIC_EXAMPLES = 1000
28
29
30
class BiasWordsEmbedding:
31
32
    def __init__(self, model, only_lower=True, verbose=False):
33
        if not isinstance(model, KeyedVectors):
34
            raise TypeError('model should be of type KeyedVectors, not {}'
35
                            .format(type(model)))
36
37
        self.model = model
38
39
        # TODO: write unitest for when it is False
40
        self.only_lower = only_lower
41
42
        self._verbose = verbose
43
44
        self.direction = None
45
        self.positive_end = None
46
        self.negative_end = None
47
48
    def __copy__(self):
49
        bias_words_embedding = self.__class__(self.model)
50
        bias_words_embedding.direction = copy.deepcopy(self.direction)
51
        bias_words_embedding.positive_end = copy.deepcopy(self.positive_end)
52
        bias_words_embedding.negative_end = copy.deepcopy(self.negative_end)
53
        return bias_words_embedding
54
55
    def __deepcopy__(self, memo):
56
        bias_words_embedding = copy.copy(self)
57
        bias_words_embedding.model = copy.deepcopy(bias_words_embedding.model)
58
        return bias_words_embedding
59
60
    def __getitem__(self, key):
61
        return self.model[key]
62
63
    def __contains__(self, item):
64
        return item in self.model
65
66
    def _filter_words_by_model(self, words):
67
        return [word for word in words if word in self]
68
69
    def _is_direction_identified(self):
70
        if self.direction is None:
71
            raise RuntimeError('The direction was not identified'
72
                               ' for this {} instance'
73
                               .format(self.__class__.__name__))
74
75
    # There is a mistake in the article
76
    # it is written (section 5.1):
77
    # "To identify the gender subspace, we took the ten gender pair difference
78
    # vectors and computed its principal components (PCs)"
79
    # however in the source code:
80
    # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/we.py#L235-L245
81
    def _identify_subspace_by_pca(self, definitional_pairs, n_components):
82
        matrix = []
83
84
        for word1, word2 in definitional_pairs:
85
            vector1 = normalize(self[word1])
86
            vector2 = normalize(self[word2])
87
88
            center = (vector1 + vector2) / 2
89
90
            matrix.append(vector1 - center)
91
            matrix.append(vector2 - center)
92
93
        pca = PCA(n_components=n_components)
94
        pca.fit(matrix)
95
96
        if self._verbose:
97
            table = enumerate(pca.explained_variance_ratio_, start=1)
98
            headers = ['Principal Component',
99
                       'Explained Variance Ratio']
100
            print(tabulate(table, headers=headers))
101
102
        return pca
103
104
    # TODO: add the SVD method from section 6 step 1
105
    # It seems there is a mistake there, I think it is the same as PCA
106
    # just with repleacing it with SVD
107
    def _identify_direction(self, positive_end, negative_end,
108
                            definitional, method='pca'):
109
        if method not in DIRECTION_METHODS:
110
            raise ValueError('method should be one of {}, {} was given'.format(
111
                DIRECTION_METHODS, method))
112
113
        if positive_end == negative_end:
114
            raise ValueError('positive_end and negative_end'
115
                             'should be different, and not the same "{}"'
116
                             .format(positive_end))
117
        if self._verbose:
118
            print('Identify direction using {} method...'.format(method))
119
120
        direction = None
121
122
        if method == 'single':
123
            direction = normalize(normalize(self[definitional[0]])
124
                                  - normalize(self[definitional[1]]))
125
126
        elif method == 'sum':
127
            groups = list(zip(*definitional))
128
129
            group1_sum_vector = np.sum([self[word]
130
                                        for word in groups[0]], axis=0)
131
            group2_sum_vector = np.sum([self[word]
132
                                        for word in groups[1]], axis=0)
133
134
            diff_vector = (normalize(group1_sum_vector)
135
                           - normalize(group2_sum_vector))
136
137
            direction = normalize(diff_vector)
138
139
        elif method == 'pca':
140
            pca = self._identify_subspace_by_pca(definitional, 10)
141
            if pca.explained_variance_ratio_[0] < FIRST_PC_THRESHOLD:
142
                raise RuntimeError('The Explained variance'
143
                                   'of the first principal component should be'
144
                                   'at least {}, but it is {}'
145
                                   .format(FIRST_PC_THRESHOLD,
146
                                           pca.explained_variance_ratio_[0]))
147
            direction = pca.components_[0]
148
149
        # if direction is oposite (e.g. we cannot control
150
        # what the PCA will return)
151
        ends_diff_projection = cosine_similarity((self[positive_end]
152
                                                  - self[negative_end]),
153
                                                 direction)
154
        if ends_diff_projection < 0:
155
            direction = -direction  # pylint: disable=invalid-unary-operand-type
156
157
        self.direction = direction
158
        self.positive_end = positive_end
159
        self.negative_end = negative_end
160
161
    def project_on_direction(self, word):
162
        self._is_direction_identified()
163
164
        vector = self[word]
165
        projection_score = self.model.cosine_similarities(self.direction,
166
                                                          [vector])[0]
167
        return projection_score
168
169
    def _calc_projection_scores(self, words):
170
        self._is_direction_identified()
171
172
        df = pd.DataFrame({'word': words})
173
174
        # TODO: maybe using cosine_similarities on all the vectors?
175
        # it might be faster
176
        df['projection'] = df['word'].apply(self.project_on_direction)
177
        df = df.sort_values('projection', ascending=False)
178
179
        return df
180
181
    def plot_projection_scores(self, words,
182
                               ax=None, axis_projection_step=None):
183
        self._is_direction_identified()
184
185
        projections_df = self._calc_projection_scores(words)
186
        projections_df['projection'] = projections_df['projection'].round(2)
187
188
        if ax is None:
189
            _, ax = plt.subplots(1)
190
191
        if axis_projection_step is None:
192
            axis_projection_step = 0.1
193
194
        cmap = plt.get_cmap('RdBu')
195
        projections_df['color'] = ((projections_df['projection'] + 0.5)
196
                                   .apply(cmap))
197
198
        most_extream_projection = (projections_df['projection']
199
                                   .abs()
200
                                   .max()
201
                                   .round(1))
202
203
        sns.barplot(x='projection', y='word', data=projections_df,
204
                    palette=projections_df['color'])
205
206
        plt.xticks(np.arange(-most_extream_projection, most_extream_projection,
207
                             axis_projection_step))
208
        plt.title('← {} {} {} →'.format(self.negative_end,
209
                                        ' ' * 20,
210
                                        self.positive_end))
211
212
        plt.xlabel('Direction Projection')
213
        plt.ylabel('Words')
214
215
        return ax
216
217
    def plot_dist_projections_on_direction(self, word_groups, ax=None):
218
        if ax is None:
219
            _, ax = plt.subplots(1)
220
221
        names = sorted(word_groups.keys())
222
223
        for name in names:
224
            label = '{} (#{})'.format(name, len(words))
0 ignored issues
show
introduced by
The variable words does not seem to be defined in case the for loop on line 223 is not entered. Are you sure this can never be the case?
Loading history...
225
            words = word_groups[name]
226
            vectors = [self[word] for word in words]
227
            projections = self.model.cosine_similarities(self.direction,
228
                                                         vectors)
229
            sns.distplot(projections, hist=False, label=label, ax=ax)
230
231
        plt.axvline(0, color='k', linestyle='--')
232
233
        plt.title('← {} {} {} →'.format(self.negative_end,
234
                                        ' ' * 20,
235
                                        self.positive_end))
236
        plt.xlabel('Direction Projection')
237
        plt.ylabel('Density')
238
        ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
239
240
        return ax
241
242
    def calc_direct_bias(self, neutral_words, c=None):
243
        if c is None:
244
            c = 1
245
246
        projections = self._calc_projection_scores(neutral_words)['projection']
247
        direct_bias_terms = np.abs(projections) ** c
248
        direct_bias = direct_bias_terms.sum() / len(neutral_words)
249
250
        return direct_bias
251
252
    def calc_indirect_bias(self, word1, word2):
253
        """Also known in the article as PairBias."""
254
        self._is_direction_identified()
255
256
        vector1 = normalize(self[word1])
257
        vector2 = normalize(self[word2])
258
259
        perpendicular_vector1 = reject_vector(vector1, self.direction)
260
        perpendicular_vector2 = reject_vector(vector2, self.direction)
261
262
        inner_product = vector1 @ vector2
263
        perpendicular_similarity = cosine_similarity(perpendicular_vector1,
264
                                                     perpendicular_vector2)
265
266
        indirect_bias = ((inner_product - perpendicular_similarity)
267
                         / inner_product)
268
        return indirect_bias
269
270
    def _extract_neutral_words(self, specific_words):
271
        extended_specific_words = set()
272
273
        # because or specific_full data was trained on partial words embedding
274
        for word in specific_words:
275
            extended_specific_words.add(word)
276
            extended_specific_words.add(word.lower())
277
            extended_specific_words.add(word.upper())
278
            extended_specific_words.add(word.title())
279
280
        neutral_words = [word for word in self.model.vocab
281
                         if word not in extended_specific_words]
282
283
        return neutral_words
284
285
    def _neutralize(self, neutral_words):
286
        self._is_direction_identified()
287
288
        if self._verbose:
289
            neutral_words_iter = tqdm(neutral_words)
290
        else:
291
            neutral_words_iter = iter(neutral_words)
292
293
        for word in neutral_words_iter:
294
            neutralized_vector = reject_vector(self[word],
295
                                               self.direction)
296
            update_word_vector(self.model, word, neutralized_vector)
297
298
        self.model.init_sims(replace=True)
299
300
    def _equalize(self, equality_sets):
301
        for equality_set_words in equality_sets:
302
            equality_set_vectors = [normalize(self[word])
303
                                    for word in equality_set_words]
304
            center = np.mean(equality_set_vectors, axis=0)
305
            (projected_center,
306
             rejected_center) = project_reject_vector(center,
307
                                                      self.direction)
308
309
            for word, vector in zip(equality_set_words, equality_set_vectors):
310
                projected_vector = project_vector(vector, self.direction)
311
312
                projected_part = normalize(projected_vector - projected_center)
313
                scaling = np.sqrt(1 - np.linalg.norm(rejected_center)**2)
314
315
                # TODO - in the code it is different - why?
316
                # equalized_vector = rejected_center + scaling * self.direction
317
                # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/debias.py#L36-L37
318
                equalized_vector = rejected_center + scaling * projected_part
319
320
                update_word_vector(self.model, word, equalized_vector)
321
322
        self.model.init_sims(replace=True)
323
324
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
325
               inplace=True):
326
        # pylint: disable=W0212
327
        if inplace:
328
            bias_words_embedding = self
329
        else:
330
            bias_words_embedding = copy.deepcopy(self)
331
332
        if method not in DEBIAS_METHODS:
333
            raise ValueError('method should be one of {}, {} was given'.format(
334
                DEBIAS_METHODS, method))
335
336
        if method in ['hard', 'neutralize']:
337
            if self._verbose:
338
                print('Neutralize...')
339
            bias_words_embedding._neutralize(neutral_words)
340
341
        if method == 'hard':
342
            if self._verbose:
343
                print('Equalize...')
344
            bias_words_embedding._equalize(equality_sets)
345
346
        if inplace:
347
            return None
348
        else:
349
            return bias_words_embedding
350
351
    def evaluate_words_embedding(self):
352
        with warnings.catch_warnings():
353
            warnings.simplefilter('ignore', category=FutureWarning)
354
355
            if self._verbose:
356
                print('Evaluate word pairs...')
357
            word_pairs_path = resource_filename(__name__,
358
                                                os.path.join('data',
359
                                                             'evaluation',
360
                                                             'wordsim353.tsv'))
361
            word_paris_result = self.model.evaluate_word_pairs(word_pairs_path)
362
363
            if self._verbose:
364
                print('Evaluate analogies...')
365
            analogies_path = resource_filename(__name__,
366
                                               os.path.join('data',
367
                                                            'evaluation',
368
                                                            'questions-words.txt'))  # pylint: disable=C0301
369
            analogies_result = self.model.evaluate_word_analogies(analogies_path)  # pylint: disable=C0301
370
371
        if self._verbose:
372
            print()
373
        print('From Gensim')
374
        print()
375
        print('-' * 30)
376
        print()
377
        print('Word Pairs Result - WordSimilarity-353:')
378
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
379
        print('Pearson correlation coefficient:', word_paris_result[0])
380
        print('Spearman rank-order correlation coefficient'
381
              'between the similarities from the dataset'
382
              'and the similarities produced by the model itself:',
383
              word_paris_result[1])
384
        print('Ratio of pairs with unknown words:', word_paris_result[2])
385
        print()
386
        print('-' * 30)
387
        print()
388
        print('Analogies Result')
389
        print('~~~~~~~~~~~~~~~~')
390
        print('Overall evaluation score:', analogies_result[0])
391
392
    def learn_full_specific_words(self, seed_specific_words,
393
                                  max_non_specific_examples=None, debug=None):
394
395
        if debug is None:
396
            debug = False
397
398
        if max_non_specific_examples is None:
399
            max_non_specific_examples = MAX_NON_SPECIFIC_EXAMPLES
400
401
        data = []
402
        non_specific_example_count = 0
403
404
        for word in self.model.vocab:
405
            is_specific = word in seed_specific_words
406
407
            if not is_specific:
408
                non_specific_example_count += 1
409
                if non_specific_example_count <= max_non_specific_examples:
410
                    data.append((self[word], is_specific))
411
            else:
412
                data.append((self[word], is_specific))
413
414
        np.random.seed(RANDOM_STATE)
415
        np.random.shuffle(data)
416
417
        X, y = zip(*data)
418
419
        X = np.array(X)
420
        X /= np.linalg.norm(X, axis=1)[:, None]
421
422
        y = np.array(y).astype('int')
423
424
        clf = LinearSVC(C=1, class_weight='balanced',
425
                        random_state=RANDOM_STATE)
426
427
        clf.fit(X, y)
428
429
        full_specific_words = []
430
        for word in self.model.vocab:
431
            vector = [normalize(self[word])]
432
            if clf.predict(vector):
433
                full_specific_words.append(word)
434
435
        if not debug:
436
            return full_specific_words, clf
437
438
        return full_specific_words, clf, X, y
439