Passed
Push — master ( 7cd869...9ce428 )
by Shlomi
02:48 queued 53s
created

ethically.we.core.BiasWordsEmbedding.__copy__()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
import copy
2
import os
3
4
import matplotlib.pylab as plt
5
import numpy as np
6
import pandas as pd
7
import seaborn as sns
8
from gensim.models.keyedvectors import KeyedVectors
9
from pkg_resources import resource_filename
10
from sklearn.decomposition import PCA
11
from sklearn.svm import LinearSVC
12
from tqdm import tqdm
13
14
from ..consts import RANDOM_STATE
15
from .data import BOLUKBASI_DATA
16
from .utils import (
17
    cosine_similarity, generate_one_word_forms, generate_words_forms,
18
    normalize, project_reject_vector, project_vector, reject_vector,
19
    update_word_vector,
20
)
21
22
23
DIRECTION_METHODS = ['single', 'sum', 'pca']
24
DEBIAS_METHODS = ['neutralize', 'hard', 'soft']
25
FIRST_PC_THRESHOLD = 0.5
26
MAX_NON_SPECIFIC_EXAMPLES = 1000
27
28
29
class BiasWordsEmbedding:
30
31
    def __init__(self, model, only_lower=True):
32
        if not isinstance(model, KeyedVectors):
33
            raise TypeError('model should be of type KeyedVectors, not {}'
34
                            .format(type(model)))
35
36
        self.model = model
37
38
        # TODO: write unitest for when it is False
39
        self.only_lower = only_lower
40
41
        self.direction = None
42
        self.positive_end = None
43
        self.negative_end = None
44
45
    def __copy__(self):
46
        bias_words_embedding = self.__class__(self.model)
47
        bias_words_embedding.direction = copy.deepcopy(self.direction)
48
        bias_words_embedding.positive_end = copy.deepcopy(self.positive_end)
49
        bias_words_embedding.negative_end = copy.deepcopy(self.negative_end)
50
        return bias_words_embedding
51
52
    def __deepcopy__(self, memo):
53
        bias_words_embedding = copy.copy(self)
54
        bias_words_embedding.model = copy.deepcopy(bias_words_embedding.model)
55
        return bias_words_embedding
56
57
    def __getitem__(self, key):
58
        return self.model[key]
59
60
    def _is_direction_identified(self):
61
        if self.direction is None:
62
            raise RuntimeError('The direction was not identified'
63
                               ' for this {} instance'
64
                               .format(self.__class__.__name__))
65
66
    # There is a mistake in the article
67
    # it is written (section 5.1):
68
    # "To identify the gender subspace, we took the ten gender pair difference
69
    # vectors and computed its principal components (PCs)"
70
    # however in the source code:
71
    # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/we.py#L235-L245
72
    def _identify_subspace_by_pca(self, definitional_pairs, n_components):
73
        matrix = []
74
75
        for word1, word2 in definitional_pairs:
76
            vector1 = normalize(self[word1])
77
            vector2 = normalize(self[word2])
78
79
            center = (vector1 + vector2) / 2
80
81
            matrix.append(vector1 - center)
82
            matrix.append(vector2 - center)
83
84
        pca = PCA(n_components=n_components)
85
        pca.fit(matrix)
86
87
        return pca
88
89
    # TODO: add the SVD method from section 6 step 1
90
    # It seems there is a mistake there, I think it is the same as PCA
91
    # just with repleacing it with SVD
92
    def _identify_direction(self, positive_end, negative_end,
93
                            definitional, method='pca'):
94
        if method not in DIRECTION_METHODS:
95
            raise ValueError('method should be one of {}, {} was given'.format(
96
                DIRECTION_METHODS, method))
97
98
        if positive_end == negative_end:
99
            raise ValueError('positive_end and negative_end'
100
                             'should be different, and not the same "{}"'
101
                             .format(positive_end))
102
103
        if method == 'single':
104
            direction = normalize(normalize(self[definitional[0]])
105
                                  - normalize(self[definitional[1]]))
106
107
        elif method == 'sum':
108
            groups = list(zip(*definitional))
109
110
            group1_sum_vector = np.sum([self[word]
111
                                        for word in groups[0]], axis=0)
112
            group2_sum_vector = np.sum([self[word]
113
                                        for word in groups[1]], axis=0)
114
115
            diff_vector = (normalize(group1_sum_vector)
116
                           - normalize(group2_sum_vector))
117
118
            direction = normalize(diff_vector)
119
120
        elif method == 'pca':
121
            pca = self._identify_subspace_by_pca(definitional, 1)
122
            if pca.explained_variance_ratio_[0] < FIRST_PC_THRESHOLD:
123
                raise RuntimeError('The Explained variance'
124
                                   'of the first principal component should be'
125
                                   'at least {}, but it is {}'
126
                                   .format(FIRST_PC_THRESHOLD,
127
                                           pca.explained_variance_ratio_[0]))
128
            direction = pca.components_[0]
129
130
        # if direction is oposite (e.g. we cannot control
131
        # what the PCA will return)
132
        positive_end_projection = cosine_similarity(self[positive_end],
133
                                                    direction)
0 ignored issues
show
introduced by
The variable direction does not seem to be defined for all execution paths.
Loading history...
134
135
        negative_end_projection = cosine_similarity(self[negative_end],
136
                                                    direction)
137
138
        if negative_end_projection > positive_end_projection:
139
            direction = -direction
140
141
        self.direction = direction
142
        self.positive_end = positive_end
143
        self.negative_end = negative_end
144
145
    def project_on_direction(self, word):
146
        self._is_direction_identified()
147
148
        vector = self[word]
149
        projection_score = self.model.cosine_similarities(self.direction,
150
                                                          [vector])[0]
151
        return projection_score
152
153
    def _calc_projection_scores(self, words):
154
        self._is_direction_identified()
155
156
        df = pd.DataFrame({'word': words})
157
158
        # TODO: maybe using cosine_similarities on all the vectors?
159
        # it might be faster
160
        df['projection'] = df['word'].apply(self.project_on_direction)
161
        df = df.sort_values('projection', ascending=False)
162
163
        return df
164
165
    def plot_projection_scores(self, words,
166
                               ax=None, axis_projection_step=None):
167
        self._is_direction_identified()
168
169
        projections_df = self._calc_projection_scores(words)
170
        projections_df['projection'] = projections_df['projection'].round(2)
171
172
        if ax is None:
173
            _, ax = plt.subplots(1)
174
175
        if axis_projection_step is None:
176
            axis_projection_step = 0.1
177
178
        cmap = plt.get_cmap('RdBu')
179
        projections_df['color'] = ((projections_df['projection'] + 0.5)
180
                                   .apply(cmap))
181
182
        most_extream_projection = (projections_df['projection']
183
                                   .abs()
184
                                   .max()
185
                                   .round(1))
186
187
        sns.barplot(x='projection', y='word', data=projections_df,
188
                    palette=projections_df['color'])
189
190
        plt.xticks(np.arange(-most_extream_projection, most_extream_projection,
191
                             axis_projection_step))
192
        plt.title('← {} {} {} →'.format(self.negative_end,
193
                                        ' ' * 20,
194
                                        self.positive_end))
195
196
        plt.xlabel('Direction Projection')
197
        plt.ylabel('Words')
198
199
    def calc_direct_bias(self, neutral_words, c=None):
200
        if c is None:
201
            c = 1
202
203
        projections = self._calc_projection_scores(neutral_words)['projection']
204
        direct_bias_terms = np.abs(projections) ** c
205
        direct_bias = direct_bias_terms.sum() / len(neutral_words)
206
207
        return direct_bias
208
209
    def calc_indirect_bias(self, word1, word2):
210
        """Also known in the article as PairBias"""
211
        self._is_direction_identified()
212
213
        vector1 = normalize(self[word1])
214
        vector2 = normalize(self[word2])
215
216
        perpendicular_vector1 = reject_vector(vector1, self.direction)
217
        perpendicular_vector2 = reject_vector(vector2, self.direction)
218
219
        inner_product = vector1 @ vector2
220
        perpendicular_similarity = cosine_similarity(perpendicular_vector1,
221
                                                     perpendicular_vector2)
222
223
        indirect_bias = ((inner_product - perpendicular_similarity)
224
                         / inner_product)
225
        return indirect_bias
226
227
    def _extract_neutral_words(self, specific_words):
228
        extended_specific_words = set()
229
230
        # because or specific_full data was trained on partial words embedding
231
        for word in specific_words:
232
            extended_specific_words.add(word)
233
            extended_specific_words.add(word.lower())
234
            extended_specific_words.add(word.upper())
235
            extended_specific_words.add(word.title())
236
237
        neutral_words = [word for word in self.model.vocab
238
                         if word not in extended_specific_words]
239
240
        return neutral_words
241
242
    def _neutralize(self, neutral_words, verbose=False):
243
        self._is_direction_identified()
244
245
        if verbose:
246
            neutral_words_iter = tqdm(neutral_words)
247
        else:
248
            neutral_words_iter = iter(neutral_words)
249
250
        for word in neutral_words_iter:
251
            neutralized_vector = reject_vector(self[word],
252
                                               self.direction)
253
            update_word_vector(self.model, word, neutralized_vector)
254
255
        self.model.init_sims(replace=True)
256
257
    def _equalize(self, equality_sets):
258
        for equality_set_words in equality_sets:
259
            equality_set_vectors = [normalize(self[word])
260
                                    for word in equality_set_words]
261
            center = np.mean(equality_set_vectors, axis=0)
262
            (projected_center,
263
             rejected_center) = project_reject_vector(center,
264
                                                      self.direction)
265
266
            for word, vector in zip(equality_set_words, equality_set_vectors):
267
                projected_vector = project_vector(vector, self.direction)
268
269
                projected_part = normalize(projected_vector - projected_center)
270
                scaling = np.sqrt(1 - np.linalg.norm(rejected_center)**2)
271
272
                # TODO - in the code it is different - why?
273
                # equalized_vector = rejected_center + scaling * self.direction
274
                # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/debias.py#L36-L37
275
                equalized_vector = rejected_center + scaling * projected_part
276
277
                update_word_vector(self.model, word, equalized_vector)
278
279
        self.model.init_sims(replace=True)
280
281
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
282
               inplace=True, verbose=False):
283
        # pylint: disable=W0212
284
        if inplace:
285
            bias_words_embedding = self
286
        else:
287
            bias_words_embedding = copy.deepcopy(self)
288
289
        if method not in DEBIAS_METHODS:
290
            raise ValueError('method should be one of {}, {} was given'.format(
291
                DEBIAS_METHODS, method))
292
293
        if method in ['hard', 'neutralize']:
294
            if verbose:
295
                print('Neutralize...')
296
            bias_words_embedding._neutralize(neutral_words, verbose)
297
298
        if method == 'hard':
299
            if verbose:
300
                print('Equalize...')
301
            bias_words_embedding._equalize(equality_sets)
302
303
        if inplace:
304
            return None
305
        else:
306
            return bias_words_embedding
307
308
    def evaluate_words_embedding(self, verbose=False):
309
        if verbose:
310
            print('Evaluate word pairs...')
311
        word_pairs_path = resource_filename(__name__,
312
                                            os.path.join('data',
313
                                                         'evaluation',
314
                                                         'wordsim353.tsv'))
315
        word_paris_result = self.model.evaluate_word_pairs(word_pairs_path)
316
317
        if verbose:
318
            print('Evaluate analogies...')
319
        analogies_path = resource_filename(__name__,
320
                                           os.path.join('data',
321
                                                        'evaluation',
322
                                                        'questions-words.txt'))
323
        analogies_result = self.model.evaluate_word_analogies(analogies_path)
324
325
        if verbose:
326
            print()
327
        print('From Gensim')
328
        print()
329
        print('-' * 30)
330
        print()
331
        print('Word Pairs Result - WordSimilarity-353:')
332
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
333
        print('Pearson correlation coefficient:', word_paris_result[0])
334
        print('Spearman rank-order correlation coefficient'
335
              'between the similarities from the dataset'
336
              'and the similarities produced by the model itself:',
337
              word_paris_result[1])
338
        print('Ratio of pairs with unknown words:', word_paris_result[2])
339
        print()
340
        print('-' * 30)
341
        print()
342
        print('Analogies Result')
343
        print('~~~~~~~~~~~~~~~~')
344
        print('Overall evaluation score:', analogies_result[0])
345
346
    def learn_full_specific_words(self, seed_specific_words,
347
                                  max_non_specific_examples=None, debug=None):
348
349
        if debug is None:
350
            debug = False
351
352
        if max_non_specific_examples is None:
353
            max_non_specific_examples = MAX_NON_SPECIFIC_EXAMPLES
354
355
        data = []
356
        non_specific_example_count = 0
357
358
        for word in self.model.vocab:
359
            is_specific = word in seed_specific_words
360
361
            if not is_specific:
362
                non_specific_example_count += 1
363
                if non_specific_example_count <= max_non_specific_examples:
364
                    data.append((self[word], is_specific))
365
            else:
366
                data.append((self[word], is_specific))
367
368
        np.random.seed(RANDOM_STATE)
369
        np.random.shuffle(data)
370
371
        X, y = zip(*data)
372
373
        X = np.array(X)
374
        X /= np.linalg.norm(X, axis=1)[:, None]
375
376
        y = np.array(y).astype('int')
377
378
        clf = LinearSVC(C=1, class_weight='balanced',
379
                        random_state=RANDOM_STATE)
380
381
        clf.fit(X, y)
382
383
        full_specific_words = []
384
        for word in self.model.vocab:
385
            vector = [normalize(self[word])]
386
            if clf.predict(vector):
387
                full_specific_words.append(word)
388
389
        if not debug:
390
            return full_specific_words, clf
391
392
        return full_specific_words, clf, X, y
393
394
395
class GenderBiasWE(BiasWordsEmbedding):
396
    PROFESSIONS_NAME = BOLUKBASI_DATA['gender']['professions_names']
397
    DEFINITIONAL_PAIRS = BOLUKBASI_DATA['gender']['definitional_pairs']
398
    SPECIFIC_SEED = set(BOLUKBASI_DATA['gender']['specific_seed'])
399
    SPECIFIC_FULL = set(BOLUKBASI_DATA['gender']['specific_full'])
400
401
    # TODO: in the code of the article, the last definitional pair
402
    # is not in the specific full
403
    SPECIFIC_FULL_WITH_DEFINITIONAL = (set.union(*map(set, DEFINITIONAL_PAIRS))
404
                                       | SPECIFIC_FULL)
405
406
    NEUTRAL_PROFESSIONS_NAME = list(set(PROFESSIONS_NAME)
407
                                    - set(SPECIFIC_FULL))
408
409
    def __init__(self, model, only_lower=True):
410
        super().__init__(model, only_lower)
411
        self._identify_direction('he', 'she',
412
                                 self.__class__.DEFINITIONAL_PAIRS,
413
                                 'pca')
414
415
        if not self.only_lower:
416
            self.SPECIFIC_FULL_WITH_DEFINITIONAL = generate_words_forms(self.SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
417
418
        self.NEUTRAL_WORDS = self._extract_neutral_words(self.__class__
419
                                                         .SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
420
421
    def calc_direct_bias(self, neutral_words='professions', c=None):
422
        if isinstance(neutral_words, str) and neutral_words == 'professions':
423
            return super().calc_direct_bias(
424
                self.__class__.NEUTRAL_PROFESSIONS_NAME, c)
425
        else:
426
            return super().calc_direct_bias(neutral_words)
427
428
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
429
               inplace=True, verbose=False):
430
        if method in ['hard', 'neutralize']:
431
            if neutral_words is None:
432
                neutral_words = self.NEUTRAL_WORDS
433
434
        if method == 'hard' and equality_sets is None:
435
            equality_sets = self.__class__.DEFINITIONAL_PAIRS
436
437
            if not self.only_lower:
438
                assert all(len(equality_set) == 2
439
                           for equality_set in equality_sets), "currently supporting only equality pairs if only_lower is False"  # pylint: disable=C0301
440
                # TODO: refactor
441
                equality_sets = {(candidate1, candidate2)
442
                                 for word1, word2 in equality_sets
443
                                 for candidate1, candidate2 in zip(generate_one_word_forms(word1),
444
                                                                   generate_one_word_forms(word2))}  # pylint: disable=C0301
445
446
        return super().debias(method, neutral_words, equality_sets,
447
                              inplace, verbose)
448
449
    def learn_full_specific_words(self, seed_specific_words='bolukbasi',
450
                                  max_non_specific_examples=None,
451
                                  debug=None):
452
        if seed_specific_words == 'bolukbasi':
453
            seed_specific_words = self.__class__.SPECIFIC_SEED
454
455
        return super().learn_full_specific_words(seed_specific_words,
456
                                                 max_non_specific_examples,
457
                                                 debug)
458