Passed
Push — master ( cde900...7ad769 )
by Shlomi
01:53
created

BiasWordsEmbedding.evaluate_words_embedding()   B

Complexity

Conditions 5

Size

Total Lines 40
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 35
nop 2
dl 0
loc 40
rs 8.5733
c 0
b 0
f 0
1
import copy
2
import os
3
import warnings
4
5
import matplotlib.pylab as plt
6
import numpy as np
7
import pandas as pd
8
import seaborn as sns
9
from gensim.models.keyedvectors import KeyedVectors
10
from pkg_resources import resource_filename
11
from sklearn.decomposition import PCA
12
from sklearn.svm import LinearSVC
13
from tqdm import tqdm
14
15
from ..consts import RANDOM_STATE
16
from .data import BOLUKBASI_DATA
17
from .utils import (
18
    cosine_similarity, generate_one_word_forms, generate_words_forms,
19
    normalize, project_reject_vector, project_vector, reject_vector,
20
    update_word_vector,
21
)
22
23
24
DIRECTION_METHODS = ['single', 'sum', 'pca']
25
DEBIAS_METHODS = ['neutralize', 'hard', 'soft']
26
FIRST_PC_THRESHOLD = 0.5
27
MAX_NON_SPECIFIC_EXAMPLES = 1000
28
29
30
class BiasWordsEmbedding:
31
32
    def __init__(self, model, only_lower=True):
33
        if not isinstance(model, KeyedVectors):
34
            raise TypeError('model should be of type KeyedVectors, not {}'
35
                            .format(type(model)))
36
37
        self.model = model
38
39
        # TODO: write unitest for when it is False
40
        self.only_lower = only_lower
41
42
        self.direction = None
43
        self.positive_end = None
44
        self.negative_end = None
45
46
    def __copy__(self):
47
        bias_words_embedding = self.__class__(self.model)
48
        bias_words_embedding.direction = copy.deepcopy(self.direction)
49
        bias_words_embedding.positive_end = copy.deepcopy(self.positive_end)
50
        bias_words_embedding.negative_end = copy.deepcopy(self.negative_end)
51
        return bias_words_embedding
52
53
    def __deepcopy__(self, memo):
54
        bias_words_embedding = copy.copy(self)
55
        bias_words_embedding.model = copy.deepcopy(bias_words_embedding.model)
56
        return bias_words_embedding
57
58
    def __getitem__(self, key):
59
        return self.model[key]
60
61
    def _is_direction_identified(self):
62
        if self.direction is None:
63
            raise RuntimeError('The direction was not identified'
64
                               ' for this {} instance'
65
                               .format(self.__class__.__name__))
66
67
    # There is a mistake in the article
68
    # it is written (section 5.1):
69
    # "To identify the gender subspace, we took the ten gender pair difference
70
    # vectors and computed its principal components (PCs)"
71
    # however in the source code:
72
    # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/we.py#L235-L245
73
    def _identify_subspace_by_pca(self, definitional_pairs, n_components):
74
        matrix = []
75
76
        for word1, word2 in definitional_pairs:
77
            vector1 = normalize(self[word1])
78
            vector2 = normalize(self[word2])
79
80
            center = (vector1 + vector2) / 2
81
82
            matrix.append(vector1 - center)
83
            matrix.append(vector2 - center)
84
85
        pca = PCA(n_components=n_components)
86
        pca.fit(matrix)
87
88
        return pca
89
90
    # TODO: add the SVD method from section 6 step 1
91
    # It seems there is a mistake there, I think it is the same as PCA
92
    # just with repleacing it with SVD
93
    def _identify_direction(self, positive_end, negative_end,
94
                            definitional, method='pca'):
95
        if method not in DIRECTION_METHODS:
96
            raise ValueError('method should be one of {}, {} was given'.format(
97
                DIRECTION_METHODS, method))
98
99
        if positive_end == negative_end:
100
            raise ValueError('positive_end and negative_end'
101
                             'should be different, and not the same "{}"'
102
                             .format(positive_end))
103
104
        direction = None
105
106
        if method == 'single':
107
            direction = normalize(normalize(self[definitional[0]])
108
                                  - normalize(self[definitional[1]]))
109
110
        elif method == 'sum':
111
            groups = list(zip(*definitional))
112
113
            group1_sum_vector = np.sum([self[word]
114
                                        for word in groups[0]], axis=0)
115
            group2_sum_vector = np.sum([self[word]
116
                                        for word in groups[1]], axis=0)
117
118
            diff_vector = (normalize(group1_sum_vector)
119
                           - normalize(group2_sum_vector))
120
121
            direction = normalize(diff_vector)
122
123
        elif method == 'pca':
124
            pca = self._identify_subspace_by_pca(definitional, 1)
125
            if pca.explained_variance_ratio_[0] < FIRST_PC_THRESHOLD:
126
                raise RuntimeError('The Explained variance'
127
                                   'of the first principal component should be'
128
                                   'at least {}, but it is {}'
129
                                   .format(FIRST_PC_THRESHOLD,
130
                                           pca.explained_variance_ratio_[0]))
131
            direction = pca.components_[0]
132
133
        # if direction is oposite (e.g. we cannot control
134
        # what the PCA will return)
135
        ends_diff_projection = cosine_similarity((self[positive_end]
136
                                                  - self[negative_end]),
137
                                                 direction)
138
        if ends_diff_projection < 0:
139
            direction = -direction  # pylint: disable=invalid-unary-operand-type
140
141
        self.direction = direction
142
        self.positive_end = positive_end
143
        self.negative_end = negative_end
144
145
    def project_on_direction(self, word):
146
        self._is_direction_identified()
147
148
        vector = self[word]
149
        projection_score = self.model.cosine_similarities(self.direction,
150
                                                          [vector])[0]
151
        return projection_score
152
153
    def _calc_projection_scores(self, words):
154
        self._is_direction_identified()
155
156
        df = pd.DataFrame({'word': words})
157
158
        # TODO: maybe using cosine_similarities on all the vectors?
159
        # it might be faster
160
        df['projection'] = df['word'].apply(self.project_on_direction)
161
        df = df.sort_values('projection', ascending=False)
162
163
        return df
164
165
    def plot_projection_scores(self, words,
166
                               ax=None, axis_projection_step=None):
167
        self._is_direction_identified()
168
169
        projections_df = self._calc_projection_scores(words)
170
        projections_df['projection'] = projections_df['projection'].round(2)
171
172
        if ax is None:
173
            _, ax = plt.subplots(1)
174
175
        if axis_projection_step is None:
176
            axis_projection_step = 0.1
177
178
        cmap = plt.get_cmap('RdBu')
179
        projections_df['color'] = ((projections_df['projection'] + 0.5)
180
                                   .apply(cmap))
181
182
        most_extream_projection = (projections_df['projection']
183
                                   .abs()
184
                                   .max()
185
                                   .round(1))
186
187
        sns.barplot(x='projection', y='word', data=projections_df,
188
                    palette=projections_df['color'])
189
190
        plt.xticks(np.arange(-most_extream_projection, most_extream_projection,
191
                             axis_projection_step))
192
        plt.title('← {} {} {} →'.format(self.negative_end,
193
                                        ' ' * 20,
194
                                        self.positive_end))
195
196
        plt.xlabel('Direction Projection')
197
        plt.ylabel('Words')
198
199
    def calc_direct_bias(self, neutral_words, c=None):
200
        if c is None:
201
            c = 1
202
203
        projections = self._calc_projection_scores(neutral_words)['projection']
204
        direct_bias_terms = np.abs(projections) ** c
205
        direct_bias = direct_bias_terms.sum() / len(neutral_words)
206
207
        return direct_bias
208
209
    def calc_indirect_bias(self, word1, word2):
210
        """Also known in the article as PairBias."""
211
        self._is_direction_identified()
212
213
        vector1 = normalize(self[word1])
214
        vector2 = normalize(self[word2])
215
216
        perpendicular_vector1 = reject_vector(vector1, self.direction)
217
        perpendicular_vector2 = reject_vector(vector2, self.direction)
218
219
        inner_product = vector1 @ vector2
220
        perpendicular_similarity = cosine_similarity(perpendicular_vector1,
221
                                                     perpendicular_vector2)
222
223
        indirect_bias = ((inner_product - perpendicular_similarity)
224
                         / inner_product)
225
        return indirect_bias
226
227
    def _extract_neutral_words(self, specific_words):
228
        extended_specific_words = set()
229
230
        # because or specific_full data was trained on partial words embedding
231
        for word in specific_words:
232
            extended_specific_words.add(word)
233
            extended_specific_words.add(word.lower())
234
            extended_specific_words.add(word.upper())
235
            extended_specific_words.add(word.title())
236
237
        neutral_words = [word for word in self.model.vocab
238
                         if word not in extended_specific_words]
239
240
        return neutral_words
241
242
    def _neutralize(self, neutral_words, verbose=False):
243
        self._is_direction_identified()
244
245
        if verbose:
246
            neutral_words_iter = tqdm(neutral_words)
247
        else:
248
            neutral_words_iter = iter(neutral_words)
249
250
        for word in neutral_words_iter:
251
            neutralized_vector = reject_vector(self[word],
252
                                               self.direction)
253
            update_word_vector(self.model, word, neutralized_vector)
254
255
        self.model.init_sims(replace=True)
256
257
    def _equalize(self, equality_sets):
258
        for equality_set_words in equality_sets:
259
            equality_set_vectors = [normalize(self[word])
260
                                    for word in equality_set_words]
261
            center = np.mean(equality_set_vectors, axis=0)
262
            (projected_center,
263
             rejected_center) = project_reject_vector(center,
264
                                                      self.direction)
265
266
            for word, vector in zip(equality_set_words, equality_set_vectors):
267
                projected_vector = project_vector(vector, self.direction)
268
269
                projected_part = normalize(projected_vector - projected_center)
270
                scaling = np.sqrt(1 - np.linalg.norm(rejected_center)**2)
271
272
                # TODO - in the code it is different - why?
273
                # equalized_vector = rejected_center + scaling * self.direction
274
                # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/debias.py#L36-L37
275
                equalized_vector = rejected_center + scaling * projected_part
276
277
                update_word_vector(self.model, word, equalized_vector)
278
279
        self.model.init_sims(replace=True)
280
281
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
282
               inplace=True, verbose=False):
283
        # pylint: disable=W0212
284
        if inplace:
285
            bias_words_embedding = self
286
        else:
287
            bias_words_embedding = copy.deepcopy(self)
288
289
        if method not in DEBIAS_METHODS:
290
            raise ValueError('method should be one of {}, {} was given'.format(
291
                DEBIAS_METHODS, method))
292
293
        if method in ['hard', 'neutralize']:
294
            if verbose:
295
                print('Neutralize...')
296
            bias_words_embedding._neutralize(neutral_words, verbose)
297
298
        if method == 'hard':
299
            if verbose:
300
                print('Equalize...')
301
            bias_words_embedding._equalize(equality_sets)
302
303
        if inplace:
304
            return None
305
        else:
306
            return bias_words_embedding
307
308
    def evaluate_words_embedding(self, verbose=False):
309
        with warnings.catch_warnings():
310
            warnings.simplefilter('ignore', category=FutureWarning)
311
312
            if verbose:
313
                print('Evaluate word pairs...')
314
            word_pairs_path = resource_filename(__name__,
315
                                                os.path.join('data',
316
                                                             'evaluation',
317
                                                             'wordsim353.tsv'))
318
            word_paris_result = self.model.evaluate_word_pairs(word_pairs_path)
319
320
            if verbose:
321
                print('Evaluate analogies...')
322
            analogies_path = resource_filename(__name__,
323
                                               os.path.join('data',
324
                                                            'evaluation',
325
                                                            'questions-words.txt'))  # pylint: disable=C0301
326
            analogies_result = self.model.evaluate_word_analogies(analogies_path)  # pylint: disable=C0301
327
328
        if verbose:
329
            print()
330
        print('From Gensim')
331
        print()
332
        print('-' * 30)
333
        print()
334
        print('Word Pairs Result - WordSimilarity-353:')
335
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
336
        print('Pearson correlation coefficient:', word_paris_result[0])
337
        print('Spearman rank-order correlation coefficient'
338
              'between the similarities from the dataset'
339
              'and the similarities produced by the model itself:',
340
              word_paris_result[1])
341
        print('Ratio of pairs with unknown words:', word_paris_result[2])
342
        print()
343
        print('-' * 30)
344
        print()
345
        print('Analogies Result')
346
        print('~~~~~~~~~~~~~~~~')
347
        print('Overall evaluation score:', analogies_result[0])
348
349
    def learn_full_specific_words(self, seed_specific_words,
350
                                  max_non_specific_examples=None, debug=None):
351
352
        if debug is None:
353
            debug = False
354
355
        if max_non_specific_examples is None:
356
            max_non_specific_examples = MAX_NON_SPECIFIC_EXAMPLES
357
358
        data = []
359
        non_specific_example_count = 0
360
361
        for word in self.model.vocab:
362
            is_specific = word in seed_specific_words
363
364
            if not is_specific:
365
                non_specific_example_count += 1
366
                if non_specific_example_count <= max_non_specific_examples:
367
                    data.append((self[word], is_specific))
368
            else:
369
                data.append((self[word], is_specific))
370
371
        np.random.seed(RANDOM_STATE)
372
        np.random.shuffle(data)
373
374
        X, y = zip(*data)
375
376
        X = np.array(X)
377
        X /= np.linalg.norm(X, axis=1)[:, None]
378
379
        y = np.array(y).astype('int')
380
381
        clf = LinearSVC(C=1, class_weight='balanced',
382
                        random_state=RANDOM_STATE)
383
384
        clf.fit(X, y)
385
386
        full_specific_words = []
387
        for word in self.model.vocab:
388
            vector = [normalize(self[word])]
389
            if clf.predict(vector):
390
                full_specific_words.append(word)
391
392
        if not debug:
393
            return full_specific_words, clf
394
395
        return full_specific_words, clf, X, y
396
397
398
class GenderBiasWE(BiasWordsEmbedding):
399
    PROFESSIONS_NAME = BOLUKBASI_DATA['gender']['professions_names']
400
    DEFINITIONAL_PAIRS = BOLUKBASI_DATA['gender']['definitional_pairs']
401
    SPECIFIC_SEED = set(BOLUKBASI_DATA['gender']['specific_seed'])
402
    SPECIFIC_FULL = set(BOLUKBASI_DATA['gender']['specific_full'])
403
404
    # TODO: in the code of the article, the last definitional pair
405
    # is not in the specific full
406
    SPECIFIC_FULL_WITH_DEFINITIONAL = (set.union(*map(set, DEFINITIONAL_PAIRS))
407
                                       | SPECIFIC_FULL)
408
409
    NEUTRAL_PROFESSIONS_NAME = list(set(PROFESSIONS_NAME)
410
                                    - set(SPECIFIC_FULL))
411
412
    def __init__(self, model, only_lower=True):
413
        super().__init__(model, only_lower)
414
        self._identify_direction('he', 'she',
415
                                 self.__class__.DEFINITIONAL_PAIRS,
416
                                 'pca')
417
418
        if not self.only_lower:
419
            self.SPECIFIC_FULL_WITH_DEFINITIONAL = generate_words_forms(self.SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
420
421
        self.NEUTRAL_WORDS = self._extract_neutral_words(self.__class__
422
                                                         .SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
423
424
    def calc_direct_bias(self, neutral_words='professions', c=None):
425
        if isinstance(neutral_words, str) and neutral_words == 'professions':
426
            return super().calc_direct_bias(
427
                self.__class__.NEUTRAL_PROFESSIONS_NAME, c)
428
        else:
429
            return super().calc_direct_bias(neutral_words)
430
431
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
432
               inplace=True, verbose=False):
433
        if method in ['hard', 'neutralize']:
434
            if neutral_words is None:
435
                neutral_words = self.NEUTRAL_WORDS
436
437
        if method == 'hard' and equality_sets is None:
438
            equality_sets = self.__class__.DEFINITIONAL_PAIRS
439
440
            if not self.only_lower:
441
                assert all(len(equality_set) == 2
442
                           for equality_set in equality_sets), "currently supporting only equality pairs if only_lower is False"  # pylint: disable=C0301
443
                # TODO: refactor
444
                equality_sets = {(candidate1, candidate2)
445
                                 for word1, word2 in equality_sets
446
                                 for candidate1, candidate2 in zip(generate_one_word_forms(word1),
447
                                                                   generate_one_word_forms(word2))}  # pylint: disable=C0301
448
449
        return super().debias(method, neutral_words, equality_sets,
450
                              inplace, verbose)
451
452
    def learn_full_specific_words(self, seed_specific_words='bolukbasi',
453
                                  max_non_specific_examples=None,
454
                                  debug=None):
455
        if seed_specific_words == 'bolukbasi':
456
            seed_specific_words = self.__class__.SPECIFIC_SEED
457
458
        return super().learn_full_specific_words(seed_specific_words,
459
                                                 max_non_specific_examples,
460
                                                 debug)
461