Passed
Branch master (17b603)
by P.R.
01:31
created

UniversalCsvReader._open()   B

Complexity

Conditions 5

Size

Total Lines 42
Code Lines 29

Duplication

Lines 42
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 29
nop 1
dl 42
loc 42
ccs 0
cts 19
cp 0
crap 30
rs 8.7173
c 0
b 0
f 0
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import bz2
9
import csv
10
from itertools import zip_longest
11
12
import chardet
13
14
from etlt.reader.Reader import Reader
15
from etlt.reader.UniversalCsvReaderFormatHelper import UniversalCsvReaderFormatHelper
16
17
18 View Code Duplication
class UniversalCsvReader(Reader):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
19
    """
20
    An universal CSV file reader.
21
    - Open uncompressed and gz, bz2 compressed files.
22
    - Auto encoding and field delimiter detection.
23
    """
24
    sample_size = 64 * 1024
25
26
    line_endings = ['\r\n', '\n\r', '\n', '\r']
27
28
    delimiters = [',', ';', '\t', '|', ':']
29
30
    # ------------------------------------------------------------------------------------------------------------------
31
    def __init__(self, filenames, format_helper=None):
32
        """
33
        Object constructor.
34
35
        :param list(str) filenames: A list of CSV file names.
36
        """
37
        Reader.__init__(self)
38
39
        self._filenames = filenames
40
        """
41
        The name of the CSV files.
42
43
        :type: list
44
        """
45
46
        self._file = None
47
        """
48
        The current actual file object.
49
        """
50
51
        self._csv_reader = None
52
        """
53
        The current actual CSV file object.
54
55
        :type: _csv.reader
56
        """
57
58
        self._filename = None
59
        """
60
        The name of the current file.
61
62
        :type: str|None
63
        """
64
65
        self._helper = UniversalCsvReaderFormatHelper() if not format_helper else format_helper
66
        """
67
        The helper for detecting the appropriate formatting parameters for reading the current CSV file.
68
69
        :type: etlt.reader.UniversalCsvReaderFormatHelper.UniversalCsvReaderFormatHelper
70
        """
71
72
        self._formatting_parameters = dict()
73
        """
74
        The CSV formatting parameters for reading the current CSV file.
75
76
        :type: dict[str,str]
77
        """
78
79
        self._sample = None
80
        """
81
        The sample when detecting automatically formatting parameters.
82
83
        :type: None|str|bytes
84
        """
85
86
    # ------------------------------------------------------------------------------------------------------------------
87
    def __enter__(self):
88
        # Nothing to do.
89
        pass
90
91
    # ------------------------------------------------------------------------------------------------------------------
92
    def __exit__(self, *_):
93
        self._close()
94
95
    # ------------------------------------------------------------------------------------------------------------------
96
    def get_source_name(self):
97
        """
98
        Returns the current source file.
99
100
        :rtype str|None:
101
        """
102
        return self._filename
103
104
    # ------------------------------------------------------------------------------------------------------------------
105
    def next(self):
106
        """
107
        Yields the next row from the source files.
108
        """
109
        for self._filename in self._filenames:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
110
            self._open()
111
            for row in self._csv_reader:
112
                self._row_number += 1
113
                if self._fields:
114
                    yield dict(zip_longest(self._fields, row, fillvalue=''))
115
                else:
116
                    yield row
117
            self._close()
118
            self._row_number = -1
119
120
        self._filename = None
121
        raise StopIteration
122
123
    # ------------------------------------------------------------------------------------------------------------------
124
    def _open_file(self, mode, encoding=None):
125
        """
126
        Opens the next current file.
127
128
        :param str mode: The mode for opening the file.
129
        :param str encoding: The encoding of the file.
130
        """
131
        if self._filename[-4:] == '.bz2':
132
            self._file = bz2.open(self._filename, mode=mode, encoding=encoding)
133
        else:
134
            self._file = open(self._filename, mode=mode, encoding=encoding)
135
136
    # ------------------------------------------------------------------------------------------------------------------
137
    def _close(self):
138
        """
139
        Closes the current file.
140
        """
141
        if self._file:
142
            self._file.close()
143
144
    # ------------------------------------------------------------------------------------------------------------------
145
    def _get_sample(self, mode, encoding):
146
        """
147
        Get a sample from the next current input file.
148
149
        :param str mode: The mode for opening the file.
150
        :param str|None encoding: The encoding of the file. None for open the file in binary mode.
151
        """
152
        self._open_file(mode, encoding)
153
        self._sample = self._file.read(UniversalCsvReader.sample_size)
154
        self._file.close()
155
156
    # ------------------------------------------------------------------------------------------------------------------
157
    def _detect_encoding(self):
158
        """
159
        Detects the encoding og the current file.
160
        :return:
161
        """
162
        self._formatting_parameters['encoding'] = chardet.detect(self._sample)['encoding']
163
164
    # ------------------------------------------------------------------------------------------------------------------
165
    def _detect_delimiter(self):
166
        """
167
        Detects the field delimiter in the sample data.
168
        """
169
        candidate_value = ','
170
        candidate_count = 0
171
        for delimiter in UniversalCsvReader.delimiters:
172
            count = self._sample.count(delimiter)
173
            if count > candidate_count:
174
                candidate_value = delimiter
175
                candidate_count = count
176
177
        self._formatting_parameters['delimiter'] = candidate_value
178
179
    # ------------------------------------------------------------------------------------------------------------------
180
    def _detect_line_ending(self):
181
        """
182
        Detects the line ending in the sample data.
183
        """
184
        candidate_value = '\n'
185
        candidate_count = 0
186
        for line_ending in UniversalCsvReader.line_endings:
187
            count = self._sample.count(line_ending)
188
            if count > candidate_count:
189
                candidate_value = line_ending
190
                candidate_count = count
191
192
        self._formatting_parameters['line_terminator'] = candidate_value
193
194
    # ------------------------------------------------------------------------------------------------------------------
195
    def _open(self):
196
        """
197
        Opens the next current file with proper settings for encoding and delimiter.
198
        """
199
        self._sample = None
200
201
        formatting_parameters0 = {'encoding':        'auto',
202
                                  'delimiter':       'auto',
203
                                  'line_terminator': 'auto',
204
                                  'escape_char':     '\\',
205
                                  'quote_char':      '"'}
206
        formatting_parameters1 = self._helper.pass1(self._filename, formatting_parameters0)
207
        self._formatting_parameters = formatting_parameters1
208
209
        # Detect encoding.
210
        if formatting_parameters1['encoding'] == 'auto':
211
            self._get_sample('rb', None)
212
            self._detect_encoding()
213
214
        # Detect delimiter.
215
        if formatting_parameters1['delimiter'] == 'auto':
216
            self._get_sample('rt', formatting_parameters1['encoding'])
217
            self._detect_delimiter()
218
219
        # Detect line terminators.
220
        if formatting_parameters1['line_terminator'] == 'auto':
221
            if not self._sample:
222
                self._get_sample('rt', formatting_parameters1['encoding'])
223
            self._detect_line_ending()
224
225
        self._formatting_parameters = self._helper.pass2(self._filename,
226
                                                         self._formatting_parameters,
227
                                                         formatting_parameters1)
228
229
        self._open_file('rt', formatting_parameters1['encoding'])
230
        self._csv_reader = csv.reader(self._file,
231
                                      delimiter=self._formatting_parameters['delimiter'],
232
                                      escapechar=self._formatting_parameters['escape_char'],
233
                                      lineterminator=self._formatting_parameters['line_terminator'],
234
                                      quotechar=self._formatting_parameters['quote_char'])  # Ignored
235
236
        self._sample = None
237
238
# ----------------------------------------------------------------------------------------------------------------------
239