Passed
Push — master ( 7ad769...a15aca )
by Shlomi
01:24
created

BiasWordsEmbedding.__contains__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import copy
2
import os
3
import warnings
4
5
import matplotlib.pylab as plt
6
import numpy as np
7
import pandas as pd
8
import seaborn as sns
9
from gensim.models.keyedvectors import KeyedVectors
10
from pkg_resources import resource_filename
11
from sklearn.decomposition import PCA
12
from sklearn.svm import LinearSVC
13
from tqdm import tqdm
14
15
from ..consts import RANDOM_STATE
16
from .data import BOLUKBASI_DATA
17
from .utils import (
18
    cosine_similarity, generate_one_word_forms, generate_words_forms,
19
    normalize, project_reject_vector, project_vector, reject_vector,
20
    update_word_vector,
21
)
22
23
24
DIRECTION_METHODS = ['single', 'sum', 'pca']
25
DEBIAS_METHODS = ['neutralize', 'hard', 'soft']
26
FIRST_PC_THRESHOLD = 0.5
27
MAX_NON_SPECIFIC_EXAMPLES = 1000
28
29
30
class BiasWordsEmbedding:
31
32
    def __init__(self, model, only_lower=True):
33
        if not isinstance(model, KeyedVectors):
34
            raise TypeError('model should be of type KeyedVectors, not {}'
35
                            .format(type(model)))
36
37
        self.model = model
38
39
        # TODO: write unitest for when it is False
40
        self.only_lower = only_lower
41
42
        self.direction = None
43
        self.positive_end = None
44
        self.negative_end = None
45
46
    def __copy__(self):
47
        bias_words_embedding = self.__class__(self.model)
48
        bias_words_embedding.direction = copy.deepcopy(self.direction)
49
        bias_words_embedding.positive_end = copy.deepcopy(self.positive_end)
50
        bias_words_embedding.negative_end = copy.deepcopy(self.negative_end)
51
        return bias_words_embedding
52
53
    def __deepcopy__(self, memo):
54
        bias_words_embedding = copy.copy(self)
55
        bias_words_embedding.model = copy.deepcopy(bias_words_embedding.model)
56
        return bias_words_embedding
57
58
    def __getitem__(self, key):
59
        return self.model[key]
60
61
    def __contains__(self, item):
62
        return item in self.model
63
64
    def _is_direction_identified(self):
65
        if self.direction is None:
66
            raise RuntimeError('The direction was not identified'
67
                               ' for this {} instance'
68
                               .format(self.__class__.__name__))
69
70
    # There is a mistake in the article
71
    # it is written (section 5.1):
72
    # "To identify the gender subspace, we took the ten gender pair difference
73
    # vectors and computed its principal components (PCs)"
74
    # however in the source code:
75
    # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/we.py#L235-L245
76
    def _identify_subspace_by_pca(self, definitional_pairs, n_components):
77
        matrix = []
78
79
        for word1, word2 in definitional_pairs:
80
            vector1 = normalize(self[word1])
81
            vector2 = normalize(self[word2])
82
83
            center = (vector1 + vector2) / 2
84
85
            matrix.append(vector1 - center)
86
            matrix.append(vector2 - center)
87
88
        pca = PCA(n_components=n_components)
89
        pca.fit(matrix)
90
91
        return pca
92
93
    # TODO: add the SVD method from section 6 step 1
94
    # It seems there is a mistake there, I think it is the same as PCA
95
    # just with repleacing it with SVD
96
    def _identify_direction(self, positive_end, negative_end,
97
                            definitional, method='pca'):
98
        if method not in DIRECTION_METHODS:
99
            raise ValueError('method should be one of {}, {} was given'.format(
100
                DIRECTION_METHODS, method))
101
102
        if positive_end == negative_end:
103
            raise ValueError('positive_end and negative_end'
104
                             'should be different, and not the same "{}"'
105
                             .format(positive_end))
106
107
        direction = None
108
109
        if method == 'single':
110
            direction = normalize(normalize(self[definitional[0]])
111
                                  - normalize(self[definitional[1]]))
112
113
        elif method == 'sum':
114
            groups = list(zip(*definitional))
115
116
            group1_sum_vector = np.sum([self[word]
117
                                        for word in groups[0]], axis=0)
118
            group2_sum_vector = np.sum([self[word]
119
                                        for word in groups[1]], axis=0)
120
121
            diff_vector = (normalize(group1_sum_vector)
122
                           - normalize(group2_sum_vector))
123
124
            direction = normalize(diff_vector)
125
126
        elif method == 'pca':
127
            pca = self._identify_subspace_by_pca(definitional, 1)
128
            if pca.explained_variance_ratio_[0] < FIRST_PC_THRESHOLD:
129
                raise RuntimeError('The Explained variance'
130
                                   'of the first principal component should be'
131
                                   'at least {}, but it is {}'
132
                                   .format(FIRST_PC_THRESHOLD,
133
                                           pca.explained_variance_ratio_[0]))
134
            direction = pca.components_[0]
135
136
        # if direction is oposite (e.g. we cannot control
137
        # what the PCA will return)
138
        ends_diff_projection = cosine_similarity((self[positive_end]
139
                                                  - self[negative_end]),
140
                                                 direction)
141
        if ends_diff_projection < 0:
142
            direction = -direction  # pylint: disable=invalid-unary-operand-type
143
144
        self.direction = direction
145
        self.positive_end = positive_end
146
        self.negative_end = negative_end
147
148
    def project_on_direction(self, word):
149
        self._is_direction_identified()
150
151
        vector = self[word]
152
        projection_score = self.model.cosine_similarities(self.direction,
153
                                                          [vector])[0]
154
        return projection_score
155
156
    def _calc_projection_scores(self, words):
157
        self._is_direction_identified()
158
159
        df = pd.DataFrame({'word': words})
160
161
        # TODO: maybe using cosine_similarities on all the vectors?
162
        # it might be faster
163
        df['projection'] = df['word'].apply(self.project_on_direction)
164
        df = df.sort_values('projection', ascending=False)
165
166
        return df
167
168
    def plot_projection_scores(self, words,
169
                               ax=None, axis_projection_step=None):
170
        self._is_direction_identified()
171
172
        projections_df = self._calc_projection_scores(words)
173
        projections_df['projection'] = projections_df['projection'].round(2)
174
175
        if ax is None:
176
            _, ax = plt.subplots(1)
177
178
        if axis_projection_step is None:
179
            axis_projection_step = 0.1
180
181
        cmap = plt.get_cmap('RdBu')
182
        projections_df['color'] = ((projections_df['projection'] + 0.5)
183
                                   .apply(cmap))
184
185
        most_extream_projection = (projections_df['projection']
186
                                   .abs()
187
                                   .max()
188
                                   .round(1))
189
190
        sns.barplot(x='projection', y='word', data=projections_df,
191
                    palette=projections_df['color'])
192
193
        plt.xticks(np.arange(-most_extream_projection, most_extream_projection,
194
                             axis_projection_step))
195
        plt.title('← {} {} {} →'.format(self.negative_end,
196
                                        ' ' * 20,
197
                                        self.positive_end))
198
199
        plt.xlabel('Direction Projection')
200
        plt.ylabel('Words')
201
202
    def calc_direct_bias(self, neutral_words, c=None):
203
        if c is None:
204
            c = 1
205
206
        projections = self._calc_projection_scores(neutral_words)['projection']
207
        direct_bias_terms = np.abs(projections) ** c
208
        direct_bias = direct_bias_terms.sum() / len(neutral_words)
209
210
        return direct_bias
211
212
    def calc_indirect_bias(self, word1, word2):
213
        """Also known in the article as PairBias."""
214
        self._is_direction_identified()
215
216
        vector1 = normalize(self[word1])
217
        vector2 = normalize(self[word2])
218
219
        perpendicular_vector1 = reject_vector(vector1, self.direction)
220
        perpendicular_vector2 = reject_vector(vector2, self.direction)
221
222
        inner_product = vector1 @ vector2
223
        perpendicular_similarity = cosine_similarity(perpendicular_vector1,
224
                                                     perpendicular_vector2)
225
226
        indirect_bias = ((inner_product - perpendicular_similarity)
227
                         / inner_product)
228
        return indirect_bias
229
230
    def _extract_neutral_words(self, specific_words):
231
        extended_specific_words = set()
232
233
        # because or specific_full data was trained on partial words embedding
234
        for word in specific_words:
235
            extended_specific_words.add(word)
236
            extended_specific_words.add(word.lower())
237
            extended_specific_words.add(word.upper())
238
            extended_specific_words.add(word.title())
239
240
        neutral_words = [word for word in self.model.vocab
241
                         if word not in extended_specific_words]
242
243
        return neutral_words
244
245
    def _neutralize(self, neutral_words, verbose=False):
246
        self._is_direction_identified()
247
248
        if verbose:
249
            neutral_words_iter = tqdm(neutral_words)
250
        else:
251
            neutral_words_iter = iter(neutral_words)
252
253
        for word in neutral_words_iter:
254
            neutralized_vector = reject_vector(self[word],
255
                                               self.direction)
256
            update_word_vector(self.model, word, neutralized_vector)
257
258
        self.model.init_sims(replace=True)
259
260
    def _equalize(self, equality_sets):
261
        for equality_set_words in equality_sets:
262
            equality_set_vectors = [normalize(self[word])
263
                                    for word in equality_set_words]
264
            center = np.mean(equality_set_vectors, axis=0)
265
            (projected_center,
266
             rejected_center) = project_reject_vector(center,
267
                                                      self.direction)
268
269
            for word, vector in zip(equality_set_words, equality_set_vectors):
270
                projected_vector = project_vector(vector, self.direction)
271
272
                projected_part = normalize(projected_vector - projected_center)
273
                scaling = np.sqrt(1 - np.linalg.norm(rejected_center)**2)
274
275
                # TODO - in the code it is different - why?
276
                # equalized_vector = rejected_center + scaling * self.direction
277
                # https://github.com/tolga-b/debiaswe/blob/10277b23e187ee4bd2b6872b507163ef4198686b/debiaswe/debias.py#L36-L37
278
                equalized_vector = rejected_center + scaling * projected_part
279
280
                update_word_vector(self.model, word, equalized_vector)
281
282
        self.model.init_sims(replace=True)
283
284
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
285
               inplace=True, verbose=False):
286
        # pylint: disable=W0212
287
        if inplace:
288
            bias_words_embedding = self
289
        else:
290
            bias_words_embedding = copy.deepcopy(self)
291
292
        if method not in DEBIAS_METHODS:
293
            raise ValueError('method should be one of {}, {} was given'.format(
294
                DEBIAS_METHODS, method))
295
296
        if method in ['hard', 'neutralize']:
297
            if verbose:
298
                print('Neutralize...')
299
            bias_words_embedding._neutralize(neutral_words, verbose)
300
301
        if method == 'hard':
302
            if verbose:
303
                print('Equalize...')
304
            bias_words_embedding._equalize(equality_sets)
305
306
        if inplace:
307
            return None
308
        else:
309
            return bias_words_embedding
310
311
    def evaluate_words_embedding(self, verbose=False):
312
        with warnings.catch_warnings():
313
            warnings.simplefilter('ignore', category=FutureWarning)
314
315
            if verbose:
316
                print('Evaluate word pairs...')
317
            word_pairs_path = resource_filename(__name__,
318
                                                os.path.join('data',
319
                                                             'evaluation',
320
                                                             'wordsim353.tsv'))
321
            word_paris_result = self.model.evaluate_word_pairs(word_pairs_path)
322
323
            if verbose:
324
                print('Evaluate analogies...')
325
            analogies_path = resource_filename(__name__,
326
                                               os.path.join('data',
327
                                                            'evaluation',
328
                                                            'questions-words.txt'))  # pylint: disable=C0301
329
            analogies_result = self.model.evaluate_word_analogies(analogies_path)  # pylint: disable=C0301
330
331
        if verbose:
332
            print()
333
        print('From Gensim')
334
        print()
335
        print('-' * 30)
336
        print()
337
        print('Word Pairs Result - WordSimilarity-353:')
338
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
339
        print('Pearson correlation coefficient:', word_paris_result[0])
340
        print('Spearman rank-order correlation coefficient'
341
              'between the similarities from the dataset'
342
              'and the similarities produced by the model itself:',
343
              word_paris_result[1])
344
        print('Ratio of pairs with unknown words:', word_paris_result[2])
345
        print()
346
        print('-' * 30)
347
        print()
348
        print('Analogies Result')
349
        print('~~~~~~~~~~~~~~~~')
350
        print('Overall evaluation score:', analogies_result[0])
351
352
    def learn_full_specific_words(self, seed_specific_words,
353
                                  max_non_specific_examples=None, debug=None):
354
355
        if debug is None:
356
            debug = False
357
358
        if max_non_specific_examples is None:
359
            max_non_specific_examples = MAX_NON_SPECIFIC_EXAMPLES
360
361
        data = []
362
        non_specific_example_count = 0
363
364
        for word in self.model.vocab:
365
            is_specific = word in seed_specific_words
366
367
            if not is_specific:
368
                non_specific_example_count += 1
369
                if non_specific_example_count <= max_non_specific_examples:
370
                    data.append((self[word], is_specific))
371
            else:
372
                data.append((self[word], is_specific))
373
374
        np.random.seed(RANDOM_STATE)
375
        np.random.shuffle(data)
376
377
        X, y = zip(*data)
378
379
        X = np.array(X)
380
        X /= np.linalg.norm(X, axis=1)[:, None]
381
382
        y = np.array(y).astype('int')
383
384
        clf = LinearSVC(C=1, class_weight='balanced',
385
                        random_state=RANDOM_STATE)
386
387
        clf.fit(X, y)
388
389
        full_specific_words = []
390
        for word in self.model.vocab:
391
            vector = [normalize(self[word])]
392
            if clf.predict(vector):
393
                full_specific_words.append(word)
394
395
        if not debug:
396
            return full_specific_words, clf
397
398
        return full_specific_words, clf, X, y
399
400
401
class GenderBiasWE(BiasWordsEmbedding):
402
    PROFESSIONS_NAME = BOLUKBASI_DATA['gender']['professions_names']
403
    DEFINITIONAL_PAIRS = BOLUKBASI_DATA['gender']['definitional_pairs']
404
    SPECIFIC_SEED = set(BOLUKBASI_DATA['gender']['specific_seed'])
405
    SPECIFIC_FULL = set(BOLUKBASI_DATA['gender']['specific_full'])
406
407
    # TODO: in the code of the article, the last definitional pair
408
    # is not in the specific full
409
    SPECIFIC_FULL_WITH_DEFINITIONAL = (set.union(*map(set, DEFINITIONAL_PAIRS))
410
                                       | SPECIFIC_FULL)
411
412
    NEUTRAL_PROFESSIONS_NAME = list(set(PROFESSIONS_NAME)
413
                                    - set(SPECIFIC_FULL))
414
415
    def __init__(self, model, only_lower=True):
416
        super().__init__(model, only_lower)
417
        self._identify_direction('he', 'she',
418
                                 self.__class__.DEFINITIONAL_PAIRS,
419
                                 'pca')
420
421
        if not self.only_lower:
422
            self.SPECIFIC_FULL_WITH_DEFINITIONAL = generate_words_forms(self.SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
423
424
        self.NEUTRAL_WORDS = self._extract_neutral_words(self.__class__
425
                                                         .SPECIFIC_FULL_WITH_DEFINITIONAL)  # pylint: disable=C0301
426
427
    def calc_direct_bias(self, neutral_words='professions', c=None):
428
        if isinstance(neutral_words, str) and neutral_words == 'professions':
429
            return super().calc_direct_bias(
430
                self.__class__.NEUTRAL_PROFESSIONS_NAME, c)
431
        else:
432
            return super().calc_direct_bias(neutral_words)
433
434
    def debias(self, method='hard', neutral_words=None, equality_sets=None,
435
               inplace=True, verbose=False):
436
        if method in ['hard', 'neutralize']:
437
            if neutral_words is None:
438
                neutral_words = self.NEUTRAL_WORDS
439
440
        if method == 'hard' and equality_sets is None:
441
            equality_sets = self.__class__.DEFINITIONAL_PAIRS
442
443
            if not self.only_lower:
444
                assert all(len(equality_set) == 2
445
                           for equality_set in equality_sets), "currently supporting only equality pairs if only_lower is False"  # pylint: disable=C0301
446
                # TODO: refactor
447
                equality_sets = {(candidate1, candidate2)
448
                                 for word1, word2 in equality_sets
449
                                 for candidate1, candidate2 in zip(generate_one_word_forms(word1),
450
                                                                   generate_one_word_forms(word2))}  # pylint: disable=C0301
451
452
        return super().debias(method, neutral_words, equality_sets,
453
                              inplace, verbose)
454
455
    def learn_full_specific_words(self, seed_specific_words='bolukbasi',
456
                                  max_non_specific_examples=None,
457
                                  debug=None):
458
        if seed_specific_words == 'bolukbasi':
459
            seed_specific_words = self.__class__.SPECIFIC_SEED
460
461
        return super().learn_full_specific_words(seed_specific_words,
462
                                                 max_non_specific_examples,
463
                                                 debug)
464