Completed
Branch master (e214b7)
by Philippe
36s
created

src.denoiser.text.Text   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 231
Duplicated Lines 0 %
Metric Value
dl 0
loc 231
rs 8.4864
wmc 48

7 Methods

Rating   Name   Duplication   Size   Complexity  
C Text.read_txt() 0 39 7
B Text.get_garbage_lines() 0 17 6
B Text.get_clean_lines() 0 17 6
A Text.__init__() 0 14 1
F Text.read_csv() 0 48 9
F Text.retrieve_text_score() 0 61 13
B Text.get_unclassified_lines() 0 17 6

How to fix   Complexity   

Complex Class

Complex classes like src.denoiser.text.Text often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""This module contains necessary classes to parse a file in order to get the :class:`.Text` object.
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
from __future__ import division
17
import re
18
from nltk.tokenize import word_tokenize
19
from unidecode import unidecode
20
import codecs
21
from collections import Counter
22
import csv
23
import logging
24
from numpy import mean
25
from denoiser.text.stats import Statistics
26
27
28
def tokenize(line):
29
    """Separate line to get clean tokens out of it
30
31
    Parameters:
32
        line (:func:`str`): A line of text
33
34
    Returns:
35
        list - List of different tokens
36
    """
37
    separators = "=+/,.:;!?%<>#()&[]{}"
38
39
    tokens = []
40
    tokenized_line = word_tokenize(line)  # Will get rid of most of the separators
41
42
    for word in tokenized_line:
43
        tmp_tokens = [unidecode(word)]
44
45
        for separator in separators:
46
            sep_tokens = []
47
48
            for tmp_token in tmp_tokens:
49
                split_token = tmp_token.split(separator)
50
51
                if len(split_token) != 1:  # Token has been split
52
                    # Concatening the list of token with the separator
53
                    tkn_sep_list = []
54
55
                    for ind, tkn in enumerate(split_token):
56
                        tkn_sep_list.append(tkn)
57
58
                        if ind != len(split_token) - 1:  # Avoid to add the separator at the end
59
                            tkn_sep_list.append(unicode(separator))
60
61
                    sep_tokens += tkn_sep_list
62
                else:
63
                    sep_tokens += split_token
64
65
            tmp_tokens = sep_tokens
66
67
        tokens += [tkn for tkn in tmp_tokens if tkn != '']
68
69
    return tokens
70
71
72
def clean_head_tail(word):
73
    """Clean head and tail of a word
74
75
    Parameters:
76
        word (:func:`str`): The word to clean
77
    Returns:
78
        :func:`str` - Cleaned word
79
    """
80
    cleaning_regexp = re.compile(r"^[^a-zA-Z'-]*([a-zA-Z'-](.*[a-zA-Z'-])?)[^a-zA-Z'-]*$")
81
    alpha_regexp = re.compile(r"[a-zA-Z]")
82
83
    word_groups = cleaning_regexp.findall(word)
84
85
    # Non matching strings are set as dirty (i.e. cannot be cleaned)
86
    # None is returned
87
    if len(word_groups) == 0:
88
        return None
89
90
    # Words containing no letters are set to None
91
    if alpha_regexp.search(word_groups[0][0]) is None:
92
        return None
93
94
    return word_groups[0][0]
95
96
97
class Text(object):
98
    """Stores the the text from a filename given in parameters
99
100
    Args:
101
        fname (str): Path to the file.
102
103
    Attributes:
104
        filename (:func:`str`): Name of the file.
105
        text (:func:`list`): List of paragraphs. Every paragraph is a list of :class:`.Line`.
106
        stats (:class:`.Statistics`): Statistics object.
107
    """
108
109
    def __init__(self, fname):
110
        self.filename = fname
111
        self.text = []
112
        self.contains_training_data = False
113
114
        self.stats = Statistics(["line_nb", "line_avg_length", "line_total_length", "word_avg_length",
115
                                 "word_total_length", "word_avg_nb", "word_total_nb"])
116
        self.stats.set_stat("line_nb", 0)
117
        self.stats.set_stat("line_avg_length", 0)
118
        self.stats.set_stat("line_total_length", 0)
119
        self.stats.set_stat("word_avg_length", 0)
120
        self.stats.set_stat("word_total_length", 0)
121
        self.stats.set_stat("word_avg_nb", 0)
122
        self.stats.set_stat("word_total_nb", 0)
123
124
    def read_csv(self):
125
        """Read a CSV file and build the associated text object
126
        """
127
        self.contains_training_data = True
128
129
        with open(self.filename, "r") as f:
130
            csv_reader = csv.reader(f)
131
            paragraph = []
132
133
            for row in csv_reader:
134
                if len(row) != 2:
135
                    if len(paragraph) != 0:
136
                        self.text.append(paragraph)
137
                        paragraph = []
138
139
                    continue
140
141
                line = unicode(row[0].decode("utf-8"))
142
                line = line.strip(" \t\r\n")
143
144
                if len(line) == 0:
145
                    if len(paragraph) != 0:
146
                        self.text.append(paragraph)
147
                        paragraph = []
148
149
                    continue
150
151
                line_object = Line(line, row[1])
152
                paragraph.append(line_object)
153
154
                self.stats.set_stat("line_nb", self.stats.get_stat("line_nb")+1)
155
                self.stats.set_stat("line_total_length", self.stats.get_stat("line_total_length")+len(line_object))
156
                self.stats.set_stat("word_total_nb", self.stats.get_stat("word_total_nb") + len(line_object.tokens))
157
158
                words_len = sum([len(tkn) for tkn in line_object.tokens])
159
                self.stats.set_stat("word_total_length", self.stats.get_stat("word_total_length") + words_len)
160
161
            if len(paragraph) != 0:
162
                self.text.append(paragraph)
163
164
        self.stats.set_stat("line_avg_length",
165
                            self.stats.get_stat("line_total_length") / self.stats.get_stat("line_nb"))
166
        self.stats.set_stat("word_avg_length",
167
                            self.stats.get_stat("word_total_length") / self.stats.get_stat("word_total_nb"))
168
        self.stats.set_stat("word_avg_nb",
169
                            self.stats.get_stat("word_total_nb") / self.stats.get_stat("line_nb"))
170
171
        logging.debug(self.filename+" read")
172
173
    def read_txt(self):
174
        """Read a text file and build the associated text object
175
        """
176
        self.contains_training_data = False
177
178
        with codecs.open(self.filename, "rb", encoding="utf-8") as f:
179
            paragraph = []
180
181
            for line in f:
182
                line = line.strip(" \t\r\n")
183
184
                if len(line) == 0:
185
                    if len(paragraph) != 0:
186
                        self.text.append(paragraph)
187
                        paragraph = []
188
189
                    continue
190
191
                line_object = Line(line)
192
                paragraph.append(line_object)
193
194
                self.stats.set_stat("line_nb", self.stats.get_stat("line_nb")+1)
195
                self.stats.set_stat("line_total_length", self.stats.get_stat("line_total_length")+len(line_object))
196
                self.stats.set_stat("word_total_nb", self.stats.get_stat("word_total_nb") + len(line_object.tokens))
197
198
                words_len = sum([len(tkn) for tkn in line_object.tokens])
199
                self.stats.set_stat("word_total_length", self.stats.get_stat("word_total_length") + words_len)
200
201
            if len(paragraph) != 0:
202
                self.text.append(paragraph)
203
204
        self.stats.set_stat("line_avg_length",
205
                            self.stats.get_stat("line_total_length") / self.stats.get_stat("line_nb"))
206
        self.stats.set_stat("word_avg_length",
207
                            self.stats.get_stat("word_total_length") / self.stats.get_stat("word_total_nb"))
208
        self.stats.set_stat("word_avg_nb",
209
                            self.stats.get_stat("word_total_nb") / self.stats.get_stat("line_nb"))
210
211
        logging.debug(self.filename+" read")
212
213
    def get_clean_lines(self):
214
        """Returns cleans line from the text object
215
216
        Returns:
217
            list: List of clean lines
218
        """
219
        lines = []
220
221
        for paragraph in self.text:
222
            for line in paragraph:
223
                if line.grade == 5:
224
                    lines.append(line.get_clean_line())
225
226
            if len(lines) > 0 and lines[-1] != "":
227
                lines.append("")
228
229
        return lines
230
231
    def get_garbage_lines(self):
232
        """Returns garbage lines from the text object
233
234
        Returns:
235
            list: List of garbage lines
236
        """
237
        lines = []
238
239
        for paragraph in self.text:
240
            for line in paragraph:
241
                if line.grade == 0:
242
                    lines.append(line.get_orig_line())
243
244
            if len(lines) > 0 and lines[-1] != "":
245
                lines.append("")
246
247
        return lines
248
249
    def get_unclassified_lines(self):
250
        """Returns unclassified lines from the text object
251
252
        Returns:
253
            list: List of unclassified lines
254
        """
255
        lines = []
256
257
        for paragraph in self.text:
258
            for line in paragraph:
259
                if line.grade % 5 != 0:  # Grade is not 0 nor 5
260
                    lines.append(line.get_orig_line())
261
262
            if len(lines) > 0 and lines[-1] != "":
263
                lines.append("")
264
265
        return lines
266
267
    def retrieve_text_score(self):
268
        """Returns some stats and score regarding classification
269
270
        Returns:
271
            dict: Dictionary containing the results
272
        """
273
        # True positive is a garbage string detected as such
274
        score_stats = {"FP": 0, "TP": 0, "FN": 0, "TN": 0}
275
        class_stats = {"classified": 0, "unclassified": 0, "unrated": 0}
276
277
        for paragraph in self.text:
278
            for line in paragraph:
279
                if line.grade != 0 and line.grade != 5:
280
                    class_stats["unclassified"] += 1
281
                    continue
282
283
                if line.result is None or line.result < 0:
284
                    class_stats["unrated"] += 1
285
                    continue
286
287
                class_stats["classified"] += 1
288
289
                if line.grade == 0:  # Line detected as garbage
290
                    if line.result == 1:  # Line is clean
291
                        score_stats["FP"] += 1  # False positive
292
                    else:  # Line is garbage
293
                        score_stats["TP"] += 1  # True postive
294
                else:  # Line detected as clean
295
                    if line.result == 1:  # Line is clean
296
                        score_stats["TN"] += 1  # True negative
297
                    else:  # Line is garbage
298
                        score_stats["FN"] += 1  # False negative
299
300
        # Precision
301
        divider_pr = score_stats["TP"] + score_stats["FP"]
302
        if divider_pr != 0:
303
            precision = score_stats["TP"] / divider_pr
304
        else:
305
            precision = 0
306
307
        # Recall
308
        divider_rc = score_stats["TP"] + score_stats["FN"]
309
        if divider_rc != 0:
310
            recall = score_stats["TP"] / divider_rc
311
        else:
312
            recall = 0
313
314
        # F1 score
315
        if precision + recall != 0:
316
            f1 = 2 * precision * recall / (precision + recall)
317
        else:
318
            f1 = 0
319
320
        return {
321
            "class": class_stats,
322
            "score": {
323
                "precision": precision,
324
                "recall": recall,
325
                "f1": f1
326
            },
327
            "raw": score_stats
328
        }
329
330
331
class Line(object):
332
    """Represents a line of text and provides datastructures to handle it.
333
334
    Args:
335
        string (str): Line to parse.
336
        result (int): (**Optional**) Expected result for a line (either a garbage string or a clean line)
337
338
    Attributes:
339
        tokens (:func:`list`): List of tokens contained in the initial string. Every list element is a :func:`list` of
340
            3 element organized in this order `(original_token, clean_token, corrected_token)`
341
        pos_string (:func:`str`): Reference string containing the position of all the tokens
342
        result (:func:`int` or :data:`None`): Expected result for a line. Helps compute fitness (F1 score) of the
343
                                              algorithm
344
        grade (:func:`int`): Grade of a line, between 0 (garbage string) and 5 (clean line).
345
        stats (:func:`dict`): Dictionary containing two :class:`.Statistics` objects. Each of them compute the number of
346
            **lower**, **upper** and **special** characters along with **numbers**.
347
    """
348
349
    def __init__(self, string, result=None):
350
        self.tokens = [[tkn, clean_head_tail(tkn), None] for tkn in tokenize(string)]
351
352
        self.pos_string = string  # String containing the position of each token (e.g. "%0 %1%2 ... %n")
353
        for index, token in enumerate(self.tokens):
354
            self.pos_string = self.pos_string.replace(token[0], "%"+str(index), 1)
355
356
        self.result = None
357
        if result is not None:
358
            self.result = int(result)
359
360
        if sum([len(t[1]) for t in self.tokens if not t[1] is None]) == 0:
361
            self.grade = 0
362
        else:
363
            self.grade = 3
364
365
        self.stats = {
366
            "orig": Statistics(["lw_char", "up_char", "nb_char", "sp_char"]),
367
            "clean": None
368
        }
369
370
        tmp_line = re.sub(r'[a-z]', 'a', self.get_orig_line())  # Lower chars replacement
371
        tmp_line = re.sub(r'[A-Z]', 'A', tmp_line)  # Upper chars replacement
372
        tmp_line = re.sub(r'[0-9]', '0', tmp_line)  # Numbers replacement
373
        tmp_line = re.sub(r'[^a-zA-Z0-9 ]', '#', tmp_line)  # Special chars replacement
374
        line_stats = Counter(tmp_line)
375
376
        self.stats["orig"].set_stat("lw_char", line_stats["a"])
377
        self.stats["orig"].set_stat("up_char", line_stats["A"])
378
        self.stats["orig"].set_stat("nb_char", line_stats["0"])
379
        self.stats["orig"].set_stat("sp_char", line_stats["#"])
380
381
    def raise_grade(self):
382
        """Add 1 to the grade of the line (up to 5)
383
        """
384
        if self.grade < 5:
385
            self.grade += 1
386
387
    def decrease_grade(self):
388
        """Remove 1 to the grade of the line (down to 0)
389
        """
390
        if self.grade > 0:
391
            self.grade -= 1
392
393
    def set_garbage(self):
394
        """Set the grade to 0
395
        """
396
        self.grade = 0
397
398
    def set_clean(self):
399
        """Set the grade to 5
400
        """
401
        self.grade = 5
402
403
    def get_orig_line(self):
404
        """Returns the original line
405
406
        Returns:
407
            str: Original line
408
        """
409
        string = self.pos_string
410
411
        for index, token in reversed(list(enumerate(self.tokens))):
412
            string = string.replace("%"+str(index), token[0])
413
414
        return string
415
416
    def get_clean_line(self):
417
        """Returns the clean line
418
419
        Returns:
420
            str: Clean line
421
        """
422
        string = self.pos_string
423
424
        for index, token in reversed(list(enumerate(self.tokens))):
425
            if not token[2] is None and len(token[2]) > 0:
426
                string = string.replace("%"+str(index), token[2].keys()[0])
427
            else:  # Inline correction is not available
428
                if not token[1] is None:
429
                    string = string.replace("%"+str(index), token[1])
430
                else:  # Clean token does not exist, use the original token
431
                    string = string.replace("%"+str(index), token[0])
432
433
        return re.sub(" +", " ", string).strip()
434
435
    def get_orig_stats(self):
436
        """Get original stats of the line
437
438
        Returns:
439
            Statistics: Statistics of the original line
440
        """
441
        return self.stats["orig"]
442
443
    def get_clean_stats(self):
444
        """Get clean stats of the line
445
446
        Returns:
447
            Statistics: Statistics of the clean line
448
        """
449
        if self.stats["clean"] is None:  # Compute clean stats if it is not already done
450
            self.stats["clean"] = Statistics(["lw_char", "up_char", "nb_char", "sp_char"])
451
452
            tmp_line = re.sub(r'[a-z]', 'a', self.get_clean_line())  # Lower chars replacement
453
            tmp_line = re.sub(r'[A-Z]', 'A', tmp_line)  # Upper chars replacement
454
            tmp_line = re.sub(r'[0-9]', '0', tmp_line)  # Numbers replacement
455
            tmp_line = re.sub(r'[^a-zA-Z0-9 ]', '#', tmp_line)  # Special chars replacement
456
            line_stats = Counter(tmp_line)
457
458
            self.stats["clean"].set_stat("lw_char", line_stats["a"])
459
            self.stats["clean"].set_stat("up_char", line_stats["A"])
460
            self.stats["clean"].set_stat("nb_char", line_stats["0"])
461
            self.stats["clean"].set_stat("sp_char", line_stats["#"])
462
463
        return self.stats["clean"]
464
465
    def get_line_score(self):
466
        """Return a global score of the line
467
468
        Returns:
469
            float: Score of the line
470
        """
471
        score = 0
472
473
        if len(self.tokens) == 0:
474
            return score
475
476
        for token in [t[2] for t in self.tokens if not t[2] is None]:
477
            score += mean([s for s in token.values()])
478
479
        return score / len(self.tokens)
480
481
    def __len__(self):
482
        return len(self.get_orig_line())
483
484
    def __str__(self):
485
        return str(self.tokens) + " | " + str(self.grade)
486