select_best_alphabetical_word()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 39

Duplication

Lines 18
Ratio 46.15 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 16
c 1
b 0
f 1
dl 18
loc 39
rs 2.7326

How to fix   Complexity   

Complexity

Complex classes like select_best_alphabetical_word() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utilities for inline denoising
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
from hashlib import md5
17
from collections import Counter
18
from copy import deepcopy
19
from math import log
20
from operator import add
21
from nltk.metrics.distance import edit_distance
22
import operator
23
from denoiser.models.inline.hashing import anagram_hash, ocr_key_hash
24
from denoiser.models.inline.ranking import rate_anagram, rate_ocr_key, rate_bigram
25
26
27
def init_correction_map(token, dictionary):
28
    """Initialize the correction dictionary
29
30
    Parameters:
31
        token (:func:`str`): Cleaned token
32
        dictionary (:func:`dict`): Dictionary structure
33
34
    Returns:
35
        :func:`dict` or :data:`None` - Correction map
36
    """
37
    if token is None:
38
        return None
39
40
    if len(token) <= 2 or token.lower() in dictionary:
41
        return {token: 1}
42
43
    return {}
44
45
46
def generate_alphabet_from_word(word):
47
    """Generate anagram hash for all chars in a word
48
49
    Parameters:
50
        word (:func:`str`): Word to generate hash
51
    Returns:
52
        set - Set of hashes
53
    """
54
    word = " "+word+" "
55
    chars = [char for char in word]  # Getting letters from the word
56
    chars += map(add, chars[:-1], chars[1:])  # Adding bigrams to the list
57
58
    # Computing hash of items and add 0 to the list
59
    return set([0] + [anagram_hash(c) for c in set(chars)])
60
61
62
def select_anagrams(token, structures):
63
    """Select possible anagrams for a given token
64
65
    Parameters:
66
        token (:func:`str`): Cleaned token
67
        structures (:func:`dict`): Datastructures from file
68
69
    Returns:
70
        :func:`dict` - Possible anagrams (keys) along with their score (values)
71
    """
72
    anagrams = {}
73
    focus_alphabet = generate_alphabet_from_word(token[1])
74
    token_hash = anagram_hash(token)
75
76
    hash_list = []
77
    for c in structures["alphabet"]:
78
        for f in focus_alphabet:
79
            hash_list.append(token_hash + c - f)
80
81
    hash_counter = Counter(hash_list)  # Counting retrieval occurence
82
83
    for h in set(hash_counter.keys()).intersection(set(structures["anagrams"].keys())):
84
        count = hash_counter[h]
85
        anag_list = [anag for anag in structures["anagrams"][h] if edit_distance(anag, token) <= 3]
86
87
        for anag in anag_list:
88
            anag_score = rate_anagram(structures["occurence_map"], token, anag, count)
89
90
            if anag_score > 0:
91
                anagrams[anag] = anag_score
92
93
    return anagrams
94
95
96
def select_ocrsims(token, structures):
97
    """Select similar words for a given token
98
99
    Parameters:
100
        token (:func:`str`): Cleaned token
101
        structures (:func:`dict`): Datastructures from file
102
103
    Returns:
104
        :func:`dict` - Similar words (keys) along with their score (values)
105
    """
106
    delta = 2
107
    ocr_sims = {}
108
109
    word_hash = ocr_key_hash(token)
110
111
    sim_hash_list = {}  # Using a dictionary avoid multiple entries if a key is retrieved twice
112
    key_index = -1
113
114
    # for (key, value) in word_hash:
115
    for key, value in word_hash:
116
        key_index += 1
117
        sim_hash = deepcopy(word_hash)
118
119
        for d in range(-delta, delta+1):
120
            if d != 0:
121
                card = max(int(value)+d, 1)
122
123
                sim_hash[key_index] = (key, card)
124
125
                # Rebuild OCR key string
126
                sim_hash_str = ""
127
                for k, v in sim_hash:
128
                    sim_hash_str += k + str(v)
129
130
                if sim_hash_str in structures["ocrkeys"]:
131
                    card_diff = abs(int(value)-card)
132
133
                    sim_hash_list[sim_hash_str] = [(sim_word, card_diff)
134
                                                   for sim_word in structures["ocrkeys"][sim_hash_str]
135
                                                   if edit_distance(sim_word, token) <= 2]
136
137
    for sim_hash_str, sim_list in sim_hash_list.items():
138
        for sim_word, card_diff in sim_list:
139
            sim_score = rate_ocr_key(structures["occurence_map"], token, sim_word, card_diff)
140
141
            if sim_score > 0:
142
                ocr_sims[sim_word] = sim_score
143
144
    return ocr_sims
145
146
147
def truncate_ocr_sim_list(token, ocr_sims_list, limit=10):
148
    """Truncate the OCR key similarity list to a defined set of possibilities
149
150
    Parameters:
151
        token (:func:`str`): Initial token
152
        ocr_sims_list (dict): OCR similarities
153
        limit (int): Final number of similarities
154
155
    Returns:
156
        dict - List of similarities to keep
157
    """
158
    if len(ocr_sims_list) <= limit:
159
        return ocr_sims_list
160
161
    ocr_scores = set([sc for sim, sc in ocr_sims_list.items()])
162
163
    # Limit of 10 different scores allowed
164
    sorted_ocr_scores = sorted(ocr_scores, reverse=True)[:limit]
165
    ocr_list = []
166
    for score in sorted_ocr_scores:
167
        tmp_ocr_list = [ocr_sims for ocr_sims, ocr_score in ocr_sims_list.items() if ocr_score == score]
168
169
        if len(ocr_list) + len(tmp_ocr_list) > limit:
170
            list_len = limit - len(ocr_list)
171
            tmp_list = []
172
173
            while len(tmp_list) < list_len:
174
                tmp_list += select_lower_edit_distance(token, tmp_ocr_list)
175
176
            if len(ocr_list) + len(tmp_list) == limit:  # Final list has exactly 10 elements
177
                ocr_list += tmp_list
178
                break
179
            else:  # List has more than 10 arguments (need to chose only the n elements needed)
180
                alpha_tmp_list = []
181
182
                while len(alpha_tmp_list) != list_len:
183
                    alpha_word = select_best_alphabetical_word(token, tmp_list)
184
185
                    alpha_tmp_list.append(alpha_word)
186
                    tmp_list = [tkn for tkn in tmp_list if tkn != alpha_word]
187
188
                ocr_list += alpha_tmp_list
189
                break
190
        elif len(ocr_list) + len(tmp_ocr_list) == limit:
191
            ocr_list += tmp_ocr_list
192
            break
193
        else:  # len(ocr_list) + len(tmp_ocr_list) < limit
194
            ocr_list += tmp_ocr_list
195
196
    if len(ocr_list) != limit:
197
        raise IndexError("OCR list is still too big ("+str(len(ocr_list))+"/"+str(limit)+")")
198
199
    return {tkn: ocr_sims_list[tkn] for tkn in ocr_list}
200
201
202
def split_ocr_list(token, ocr_list):
203
    """Split the OCR list between strong and week OCR words
204
205
    Parameters:
206
        token (:func:`str`): Token to correct
207
        ocr_list (:func:`dict`): List of possible OCR correction
208
    Returns:
209
        tuple - Strong OCR words and weak OCR words
210
    """
211
212
    # Build the sorted OCR key list and divide it into 2 different stacks
213
    ocr_words = sorted(
214
        ocr_list.iteritems(),
215
        key=operator.itemgetter(1),
216
        reverse=True
217
    )
218
    strong_ocr_words = {tkn: sc for tkn, sc in ocr_words[:5]}
219
    weak_ocr_words = {tkn: sc for tkn, sc in ocr_words[5:]}
220
221
    min_strong_score = min([sc for tkn, sc in strong_ocr_words.items()])
222
    max_weak_score = max([sc for tkn, sc in weak_ocr_words.items()])
223
224
    if min_strong_score == max_weak_score:
225
        strong_tmp_ocr_words = [tkn for tkn, score in strong_ocr_words.items() if score == min_strong_score]
226
        weak_tmp_ocr_words = [tkn for tkn, score in weak_ocr_words.items() if score == min_strong_score]
227
228
        tmp_list = {t: min_strong_score for t in strong_tmp_ocr_words}
229
        tmp_list.update({t: min_strong_score for t in weak_tmp_ocr_words})
230
231
        tmp_strg_list = truncate_ocr_sim_list(token, tmp_list, len(strong_tmp_ocr_words))
232
        tmp_weak_list = [tkn for tkn in tmp_list.keys() if tkn not in tmp_strg_list]
233
234
        strong_ocr_words = {tkn: sc for tkn, sc in strong_ocr_words.items() if tkn not in tmp_list.keys()}
235
        strong_ocr_words.update(tmp_strg_list)
236
237
        weak_ocr_words = {tkn: sc for tkn, sc in weak_ocr_words.items() if tkn not in tmp_list.keys()}
238
        weak_ocr_words.update({tkn: min_strong_score for tkn in tmp_weak_list})
239
240
    return strong_ocr_words, weak_ocr_words
241
242
243
def build_candidates_list(token, anagrams_list, ocr_sims_list, structures):
244
    """Merge anagram and OCRkey list into one list.
245
246
    Parameters:
247
        token (:func:`str`): Cleaned token
248
        anagrams_list (:func:`dict`): Result of `select_anagrams`
249
        ocr_sims_list (:func:`dict`): Result of `select_ocrsims`
250
        structures (:func:`dict`): Datastructures from file
251
252
    Returns:
253
        :func:`dict` - Correction tokens (keys) along with their score (values)
254
    """
255
    final_list = anagrams_list
256
257
    ocr_list = truncate_ocr_sim_list(token, ocr_sims_list)
258
259
    strong_ocr_list = ocr_list
260
    weak_ocr_list = {}
261
    if len(ocr_list) > 5:
262
        (strong_ocr_list, weak_ocr_list) = split_ocr_list(token, ocr_list)
263
264
    for ocr_word, ocr_score in strong_ocr_list.items():
265
        if ocr_word in final_list.keys():
266
            final_list[ocr_word] *= ocr_score
267
            del strong_ocr_list[ocr_word]
268
269
    strong_ocr_list.update(weak_ocr_list)
270
271
    for ocr_word, ocr_score in strong_ocr_list.items():
272
        if ocr_word not in final_list.keys():
273
            final_list[ocr_word] = rate_anagram(structures["occurence_map"], token, ocr_word, 1) \
274
                * rate_ocr_key(structures["occurence_map"], token, ocr_word, 0)
275
276
    return final_list
277
278
279
def find_correct_case(word, case_mode, structures):
280
    """Select the best case between a set of already encountered cases
281
282
    Parameters:
283
        word (:func:`str`): Word to correct
284
        case_mode (int): Choice between lower or upper case (extra choice for undecisive)
285
        structures (dict): List of structures needed to perform the choice
286
    Returns:
287
        :func:`str` - Corrected word
288
    """
289
    variations = {key: structures["occurence_map"][key] for key in structures["altcase"][word.lower()]}
290
    variations = sorted(variations.iteritems(), key=operator.itemgetter(1), reverse=True)
291
292
    tmp_vars = []
293
    if case_mode == 0:  # Upper case spelling
294
        for var in variations:
295
            _word = var[0]
296
            if _word[0].isupper() and sum(char.isupper() for char in _word) > 2:
297
                tmp_vars.append(var)
298
299
        if len(tmp_vars) == 0:
300
            tmp_vars = variations
301
    elif case_mode == 1:  # Lower case with capital initial
302
        for var in variations:
303
            _word = var[0]
304
            if _word[0].isupper() and sum(char.isupper() for char in _word) <= 2:
305
                tmp_vars.append(var)
306
307
        if len(tmp_vars) == 0:
308
            tmp_vars = variations
309
    else:  # case_mode == -1 (no capital letters found)
310
        tmp_vars = variations
311
312
    max_occ = tmp_vars[0][1]
313
    dist_vars = {term: edit_distance(word, term) for term, occ in tmp_vars if occ == max_occ}
314
315
    if len(dist_vars) == 1:
316
        return dist_vars.keys()[0]
317
318
    # Several terms with max occurence still exist
319
    dist_vars = sorted(dist_vars.iteritems(), key=operator.itemgetter(1))
320
321
    min_dist = dist_vars[0][1]
322
    min_dist_vars = [term for term, dist in dist_vars if dist == min_dist]
323
324
    if len(min_dist_vars) == 1:
325
        return min_dist_vars[0]
326
327
    # Several terms with same Levenhstein distance exist
328
    term_ascii_code = {term: [ord(ch) for ch in term] for term in min_dist_vars}
329
330
    for ascii_code in term_ascii_code.values():
331
        for i in xrange(len(ascii_code)):
332
            code = ascii_code[i]
333
334
            # Non a-zA-Z chars will have a 0 value
335
            if code < 65 or 90 < code < 97 or code > 122:
336
                ascii_code[i] = 0
337
338 View Code Duplication
    if case_mode >= 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
339
        ascii_val = min(term_ascii_code.values())
340
341
        t = [t for t, v in term_ascii_code.items() if v == ascii_val]
342
343
        if len(t) > 1:
344
            raise ValueError("Too many value in final array")
345
346
        return t[0]
347
    else:
348
        ascii_val = max(term_ascii_code.values())
349
350
        t = [t for t, v in term_ascii_code.items() if v == ascii_val]
351
352
        if len(t) > 1:
353
            raise ValueError("Too many value in final array")
354
355
        return t[0]
356
357
358
def correct_case(token, corrections_map, structures):
359
    """Select the best spelling for a word (case-wise)
360
361
    Parameters:
362
        token (:func:`str`): Cleaned token
363
        corrections_map (:func:`dict`): Result of `build_candidates_list`
364
        structures (:func:`dict`): Datastructures from file
365
366
    Returns:
367
        :func:`dict` - Corrected tokens (keys) along with their score (values)
368
    """
369
    alt_case_mode = -1  # Most common variation
370
    if token[0].isupper():
371
        if sum(char.isupper() for char in token) > 2:
372
            alt_case_mode = 0  # Upper case variation
373
        else:
374
            alt_case_mode = 1  # Lower case variation with capital first letter
375
376
    corrected_case_map = {}
377
    for correct_word, score in corrections_map.items():
378
        if correct_word.find(" ") != -1:
379
            words = correct_word.split(" ")
380
381
            keys_left = find_correct_case(words[0], alt_case_mode, structures)
382
            keys_right = find_correct_case(words[1], alt_case_mode, structures)
383
            key = keys_left+" "+keys_right
384
        else:
385
            key = find_correct_case(correct_word, alt_case_mode, structures)
386
387
        # If the key already exists we keep the highest score
388
        if key in corrected_case_map.keys():
389
            old_score = corrected_case_map[key]
390
            corrected_case_map[key] = max(old_score, score)
391
        else:
392
            corrected_case_map[key] = score
393
394
    return corrected_case_map
395
396
397
def apply_bigram_boost(paragraph, bigrams, occurence_map):
398
    """Compute the bigram boost for every token of a paragraph and apply it to the possible corrections.
399
400
    Parameters:
401
        paragraph (:func:`list`): List of lines
402
        bigrams (:func:`list`): Bigrams for the given paragraph
403
        occurence_map (:func:`dict`): Occurence of unigrams and bigrams
404
    """
405
    token_index = -1
406
407
    for line in paragraph:
408
        for token in line.tokens:
409
            if token[2] is None:
410
                continue
411
412
            # Finding adjacent tokens
413
            adjacent_tokens = []
414
415
            if token_index > 0:
416
                adjacent_tokens.append(bigrams[token_index][0])
417
            else:
418
                adjacent_tokens.append(None)
419
420
            token_index += 1
421
422
            if token_index < len(bigrams):
423
                adjacent_tokens.append(bigrams[token_index][1])
424
            else:
425
                adjacent_tokens.append(None)
426
427
            # Normalizing adjacent tokens array
428
            for tkn_index in xrange(len(adjacent_tokens)):
429
                adj_tkn = adjacent_tokens[tkn_index]
430
431
                if adj_tkn is None:
432
                    adjacent_tokens[tkn_index] = []
433
                    continue
434
435
                if not adj_tkn[2] is None:
436
                    adjacent_tokens[tkn_index] = []
437
                    adj_tkn = sorted(adj_tkn[2].iteritems(), key=operator.itemgetter(1), reverse=True)
438
439
                    for idx in xrange(min(5, len(adj_tkn))):
440
                        adjacent_tokens[tkn_index].append(adj_tkn[idx][0].lower())
441
                else:
442
                    if not adj_tkn[1] is None:
443
                        adjacent_tokens[tkn_index] = [adj_tkn[1].lower()]
444
                    else:
445
                        adjacent_tokens[tkn_index] = [adj_tkn[0].lower()]
446
447
            # Computing bigram boost
448
            for correction in token[2].keys():
449
                bigram_boost = rate_bigram(correction.lower(), adjacent_tokens[0], adjacent_tokens[1], occurence_map)
450
                token[2][correction] *= bigram_boost
451
452
453
def select_lower_edit_distance(ref_word, word_list):
454
    """Get the word with the lower edit distance
455
456
    Parameters:
457
        ref_word (:func:`str`): Word to correct
458
        word_list (list): List of proposals
459
    
460
    Returns:
461
        :func:`str` - Selected word
462
    """
463
    word_dict = {word: edit_distance(ref_word, word) for word in word_list}
464
    min_dist = min(word_dict.values())
465
466
    return [word for word, dist in word_dict.items() if dist == min_dist]
467
468
469
def select_by_hash(word_list):
470
    """Select the word with the lower md5 hash
471
472
    Parameters:
473
        word_list (list): List of proposal
474
475
    Returns:
476
        :func:`str` - Selected word
477
    """
478
    hashes = set([md5(word).hexdigest() for word in word_list])
479
480
    if len(hashes) != len(word_list):
481
        raise Exception("differenciation impossible")
482
483
    return [tkn for tkn in word_list if md5(tkn).hexdigest() == min(hashes)][0]
484
485
486
def select_best_alphabetical_word(ref_word, word_list):
487
    """Select closest alphabetical word (non alphanumerical chars are set to the same value)
488
489
    Parameters:
490
        ref_word (:func:`str`): Word to correct
491
        word_list (list): List of propositions
492
493
    Returns:
494
        :func:`str` - Selected word
495
    """
496
    case_mode = -1 if ref_word[0].isupper() else 0
497
    term_ascii_code = {term: [ord(ch) for ch in term] for term in word_list}
498
499
    for ascii_code in term_ascii_code.values():
500
        for i in xrange(len(ascii_code)):
501
            code = ascii_code[i]
502
503
            # Non a-zA-Z chars will have a 0 value
504
            if code < 65 or 90 < code < 97 or code > 122:
505
                ascii_code[i] = 0
506
507 View Code Duplication
    if case_mode >= 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508
        ascii_val = min(term_ascii_code.values())
509
510
        tkn_list = [t for t, v in term_ascii_code.items() if v == ascii_val]
511
512
        if len(tkn_list) > 1:
513
            return select_by_hash(tkn_list)
514
515
        return tkn_list[0]
516
    else:
517
        ascii_val = max(term_ascii_code.values())
518
519
        tkn_list = [t for t, v in term_ascii_code.items() if v == ascii_val]
520
521
        if len(tkn_list) > 1:
522
            return select_by_hash(tkn_list)
523
524
        return tkn_list[0]
525
526
527
def select_correction(word, corrections_map):
528
    """Select the best correction for a word given its score
529
530
    Parameters:
531
        word (str): Word to select a correction for.
532
        corrections_map (:func:`dict`): Dictionary containing all corrections for a token along with their score
533
534
    Returns:
535
        :func:`dict` - Chosen correction(s) along with their score
536
    """
537
    if corrections_map is None or len(corrections_map) == 1:
538
        return corrections_map
539
540
    max_val = max(corrections_map.values())
541
    final_list = {term: val for term, val in corrections_map.items() if val == max_val}
542
543
    if len(final_list) == 1:  # One value has the maximum
544
        if final_list.values()[0] > 0.7:  # Highly valued terms are chosen by default
545
            return final_list
546
547
        first_word = final_list.keys()[0]
548
549
        # If the threshold value has not been reached we are looking for a second term
550
        del corrections_map[final_list.keys()[0]]
551
552
        max_val = max(corrections_map.values())
553
        tmp_list = {term: val for term, val in corrections_map.items() if val == max_val}
554
555
        if len(tmp_list) == 1:  # One value has the second higher grade
556
            final_list.update(tmp_list)
557
            second_word = tmp_list.keys()[0]
558
        else:  # Several terms with the same score
559
            # Differenciation on the Levenhstein distance
560
            tmp_list = select_lower_edit_distance(word, tmp_list.keys())
561
562
            if len(tmp_list) == 1:  # One term has the lowest score
563
                final_list[tmp_list[0]] = max_val
564
                second_word = tmp_list[0]
565
            else:  # Several terms with the same
566
                # Choose the best alphabetical term
567
                second_word = select_best_alphabetical_word(word, tmp_list)
568
                final_list[second_word] = max_val
569
570
        # Determine if we need one or two terms
571
        if log(final_list[first_word] / final_list[second_word]) >= 1:
572
            del final_list[second_word]
573
574
        return final_list
575
    elif len(final_list) != 2:  # More than 2 values share the same maximum
576
        tmp_list = select_lower_edit_distance(word, final_list.keys())
577
578
        if len(tmp_list) == 1:  # One word get the min edit distance
579
            first_word = tmp_list[0]
580
            tmp_final_list = final_list
581
            del tmp_final_list[first_word]
582
583
            tmp_list = select_lower_edit_distance(word, tmp_final_list.keys())
584
585
            if len(tmp_list) == 1:  # One word get the second minimal edit distance
586
                final_list = {
587
                    first_word: max_val,
588
                    tmp_list[0]: max_val
589
                }
590
591
                return final_list
592
            else:  # The second minimal edit distance is shared by several terms
593
                best_term = select_best_alphabetical_word(word, tmp_list)
594
595
                final_list = {
596
                    first_word: max_val,
597
                    best_term: max_val
598
                }
599
600
                return final_list
601
        elif len(tmp_list) == 2:  # Exactly two word get the same min edit distance
602
            final_list = {
603
                tmp_list[0]: max_val,
604
                tmp_list[1]: max_val
605
            }
606
607
            return final_list
608
        else:  #
609
            best_term_1 = select_best_alphabetical_word(word, tmp_list)
610
611
            tmp_list = [term for term in tmp_list if term != best_term_1]
612
            best_term_2 = select_best_alphabetical_word(word, tmp_list)
613
614
            final_list = {
615
                best_term_1: max_val,
616
                best_term_2: max_val
617
            }
618
619
            return final_list
620
    else:  # Two words with the same score
621
        return final_list
622
623
624
def extract_paragraph_bigrams(paragraph):
625
    """Get bigrams for a given paragraph
626
627
    Parameters:
628
        paragraph (list): Paragraph to extract bigrams from
629
630
    Returns:
631
        list - Bigram list
632
    """
633
    p_tokens = [token for line in paragraph for token in line.tokens]
634
    bigram_list = []
635
636
    for index in xrange(len(p_tokens) - 1):
637
        bigram_list.append((p_tokens[index], p_tokens[index + 1]))
638
639
    return bigram_list
640