Completed
Push — master ( 7b86c6...8c3cbf )
by P.R.
01:53
created

UniversalCsvReader._get_sample()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 10
ccs 0
cts 4
cp 0
crap 2
rs 9.4285
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import bz2
9
import csv
10
from itertools import zip_longest
11
12
import chardet
13
14
from etlt.reader.Reader import Reader
15
from etlt.reader.UniversalCsvReaderFormatHelper import UniversalCsvReaderFormatHelper
16
17
18
class UniversalCsvReader(Reader):
19
    """
20
    An universal CSV file reader.
21
    - Open uncompressed and gz, bz2 compressed files.
22
    - Auto encoding and field delimiter detection.
23
    """
24
    sample_size = 64 * 1024
25
26
    line_endings = ['\r\n', '\n\r', '\n', '\r']
27
28
    delimiters = [',', ';', '\t', '|', ':']
29
30
    # ------------------------------------------------------------------------------------------------------------------
31
    def __init__(self, filenames, format_helper=None):
32
        """
33
        Object constructor.
34
35
        :param list(str) filenames: A list of CSV file names.
36
        """
37
        Reader.__init__(self)
38
39
        self._filenames = filenames
40
        """
41
        The name of the CSV files.
42
43
        :type: list
44
        """
45
46
        self._file = None
47
        """
48
        The current actual file object.
49
        """
50
51
        self._csv_reader = None
52
        """
53
        The current actual CSV file object.
54
55
        :type: _csv.reader
56
        """
57
58
        self._filename = None
59
        """
60
        The name of the current file.
61
62
        :type: str|None
63
        """
64
65
        self._helper = UniversalCsvReaderFormatHelper() if not format_helper else format_helper
66
        """
67
        The helper for detecting the appropriate formatting parameters for reading the current CSV file.
68
69
        :type: etlt.reader.UniversalCsvReaderFormatHelper.UniversalCsvReaderFormatHelper
70
        """
71
72
        self._formatting_parameters = dict()
73
        """
74
        The CSV formatting parameters for reading the current CSV file.
75
76
        :type: dict[str,str]
77
        """
78
79
    # ------------------------------------------------------------------------------------------------------------------
80
    def __enter__(self):
81
        # Nothing to do.
82
        pass
83
84
    # ------------------------------------------------------------------------------------------------------------------
85
    def __exit__(self, *_):
86
        self._close()
87
88
    # ------------------------------------------------------------------------------------------------------------------
89
    def get_source_name(self):
90
        """
91
        Returns the current source file.
92
93
        :rtype str|None:
94
        """
95
        return self._filename
96
97
    # ------------------------------------------------------------------------------------------------------------------
98
    def next(self):
99
        """
100
        Yields the next row from the source files.
101
        """
102
        for self._filename in self._filenames:
103
            self._open()
104
            for row in self._csv_reader:
105
                self._row_number += 1
106
                yield dict(zip_longest(self._fields, row, fillvalue=''))
107
            self._close()
108
            self._row_number = -1
109
110
        self._filename = None
111
        raise StopIteration
112
113
    # ------------------------------------------------------------------------------------------------------------------
114
    def _open_file(self, mode, encoding=None):
115
        """
116
        Opens the next current file.
117
118
        :param str mode: The mode for opening the file.
119
        :param str encoding: The encoding of the file.
120
        """
121
        if self._filename[-4:] == '.bz2':
122
            self._file = bz2.open(self._filename, mode=mode, encoding=encoding)
123
        else:
124
            self._file = open(self._filename, mode=mode, encoding=encoding)
125
126
    # ------------------------------------------------------------------------------------------------------------------
127
    def _close(self):
128
        """
129
        Closes the current file.
130
        """
131
        if self._file:
132
            self._file.close()
133
134
    # ------------------------------------------------------------------------------------------------------------------
135
    def _get_sample(self, mode, encoding):
136
        """
137
        Get a sample from the next current input file.
138
139
        :param str mode: The mode for opening the file.
140
        :param str|None encoding: The encoding of the file. None for open the file in binary mode.
141
        """
142
        self._open_file(mode, encoding)
143
        self._sample = self._file.read(UniversalCsvReader.sample_size)
0 ignored issues
show
Coding Style introduced by
The attribute _sample was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
144
        self._file.close()
145
146
    # ------------------------------------------------------------------------------------------------------------------
147
    def _detect_encoding(self):
148
        """
149
        Detects the encoding og the current file.
150
        :return:
151
        """
152
        self._formatting_parameters['encoding'] = chardet.detect(self._sample)['encoding']
153
154
    # ------------------------------------------------------------------------------------------------------------------
155
    def _detect_delimiter(self):
156
        """
157
        Detects the field delimiter in the sample data.
158
        """
159
        candidate_value = ','
160
        candidate_count = 0
161
        for delimiter in UniversalCsvReader.delimiters:
162
            count = self._sample.count(delimiter)
163
            if count > candidate_count:
164
                candidate_value = delimiter
165
                candidate_count = count
166
167
        self._formatting_parameters['delimiter'] = candidate_value
168
169
    # ------------------------------------------------------------------------------------------------------------------
170
    def _detect_line_ending(self):
171
        """
172
        Detects the line ending in the sample data.
173
        """
174
        candidate_value = '\n'
175
        candidate_count = 0
176
        for line_ending in UniversalCsvReader.line_endings:
177
            count = self._sample.count(line_ending)
178
            if count > candidate_count:
179
                candidate_value = line_ending
180
                candidate_count = count
181
182
        self._formatting_parameters['line_terminator'] = candidate_value
183
184
    # ------------------------------------------------------------------------------------------------------------------
185
    def _open(self):
186
        """
187
        Opens the next current file with proper settings for encoding and delimiter.
188
        """
189
        self._sample = None
0 ignored issues
show
Coding Style introduced by
The attribute _sample was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
190
191
        formatting_parameters0 = {'encoding':        'auto',
192
                                  'delimiter':       'auto',
193
                                  'line_terminator': 'auto',
194
                                  'escape_char':     '\\',
195
                                  'quote_char':      '"'}
196
        formatting_parameters1 = self._helper.pass1(self._filename, formatting_parameters0)
197
        self._formatting_parameters = formatting_parameters1
198
199
        # Detect encoding.
200
        if formatting_parameters1['encoding'] == 'auto':
201
            self._get_sample('rb', None)
202
            self._detect_encoding()
203
204
        # Detect delimiter.
205
        if formatting_parameters1['delimiter'] == 'auto':
206
            self._get_sample('rt', formatting_parameters1['encoding'])
207
            self._detect_delimiter()
208
209
        # Detect line terminators.
210
        if formatting_parameters1['line_terminator'] == 'auto':
211
            if not self._sample:
212
                self._get_sample('rt', formatting_parameters1['encoding'])
213
                self._detect_delimiter()
214
215
        self._formatting_parameters = self._helper.pass2(self._filename,
216
                                                         self._formatting_parameters,
217
                                                         formatting_parameters1)
218
219
        self._open_file('rt', formatting_parameters1['encoding'])
220
        self._csv_reader = csv.reader(self._file,
221
                                      delimiter=self._formatting_parameters['delimiter'],
222
                                      escapechar=self._formatting_parameters['escape_char'],
223
                                      lineterminator=self._formatting_parameters['encoding'],
224
                                      quotechar=self._formatting_parameters['quote_char'])  # Ignored
225
226
# ----------------------------------------------------------------------------------------------------------------------
227