UniversalCsvReader._detect_encoding()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
import bz2
2
import copy
3
import csv
4
from itertools import zip_longest
5
from typing import Dict, List, Optional, Union
6
7
import chardet
8
9
from etlt.reader.Reader import Reader
10
from etlt.reader.UniversalCsvReaderFormatHelper import UniversalCsvReaderFormatHelper
11
12
13
class UniversalCsvReader(Reader):
14
    """
15
    A universal CSV file reader.
16
    - Open uncompressed and gz, bz2 compressed files.
17
    - Auto encoding and field delimiter detection.
18
    """
19
    sample_size = 64 * 1024
20
21
    line_endings = ['\r\n', '\n\r', '\n', '\r']
22
23
    delimiters = [',', ';', '\t', '|', ':']
24
25
    # ------------------------------------------------------------------------------------------------------------------
26
    def __init__(self, filenames: List[str], format_helper=None):
27
        """
28
        Object constructor.
29
30
        :param list(str) filenames: A list of CSV file names.
31
        """
32
        Reader.__init__(self)
33
34
        self._filenames: List[str] = filenames
35
        """
36
        The name of the CSV files.
37
        """
38
39
        self._file = None
40
        """
41
        The current actual file object.
42
        """
43
44
        self._csv_reader = None
45
        """
46
        The current actual CSV file object.
47
48
        :type: _csv.reader
49
        """
50
51
        self._mapping: Optional[Dict[str, int]] = None
52
        """
53
        The mapping from column names to column numbers.
54
        """
55
56
        self._filename: Optional[str] = None
57
        """
58
        The name of the current file.
59
        """
60
61
        self._helper = UniversalCsvReaderFormatHelper() if not format_helper else format_helper
62
        """
63
        The helper for detecting the appropriate formatting parameters for reading the current CSV file.
64
        """
65
66
        self._formatting_parameters: Dict[str, str] = dict()
67
        """
68
        The CSV formatting parameters for reading the current CSV file.
69
        """
70
71
        self._sample: Optional[Union[str, bytes]] = None
72
        """
73
        The sample when detecting automatically formatting parameters.
74
        """
75
76
    # ------------------------------------------------------------------------------------------------------------------
77
    def __enter__(self):
78
        # Nothing to do.
79
        pass
80
81
    # ------------------------------------------------------------------------------------------------------------------
82
    def __exit__(self, *_):
83
        self._close()
84
85
    # ------------------------------------------------------------------------------------------------------------------
86
    def get_source_name(self) -> Optional[str]:
87
        """
88
        Returns the current source file.
89
        """
90
        return self._filename
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def next(self):
94
        """
95
        Yields the next row from the source files.
96
        """
97
        for self._filename in self._filenames:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
98
            self._open()
99
            for row in self._csv_reader:
100
                self._row_number += 1
101
                if self._mapping:
102
                    yield {column_name: row[index] if 0 <= index < len(row) else '' for column_name, index in
103
                           self._mapping.items()}
104
                elif self._fields:
105
                    yield dict(zip_longest(self._fields, row, fillvalue=''))
106
                else:
107
                    yield row
108
109
            self._close()
110
            self._row_number = -1
111
112
        self._filename = None
113
114
        return
115
116
    # ------------------------------------------------------------------------------------------------------------------
117
    @property
118
    def mapping(self) -> Optional[Dict[str, int]]:
119
        """
120
        Getter for mapping.
121
        """
122
        return copy.copy(self._mapping)
123
124
    # ------------------------------------------------------------------------------------------------------------------
125
    @mapping.setter
126
    def mapping(self, mapping: Optional[Dict[str, int]]):
127
        """
128
129
        """
130
        self._mapping = mapping
131
132
    # ------------------------------------------------------------------------------------------------------------------
133
    def _open_file(self, mode: str, encoding: Optional[str] = None) -> None:
134
        """
135
        Opens the next current file.
136
137
        :param mode: The mode for opening the file.
138
        :param encoding: The encoding of the file.
139
        """
140
        if self._filename[-4:] == '.bz2':
141
            self._file = bz2.open(self._filename, mode=mode, encoding=encoding)
142
        else:
143
            self._file = open(self._filename, mode=mode, encoding=encoding)
144
145
    # ------------------------------------------------------------------------------------------------------------------
146
    def _close(self) -> None:
147
        """
148
        Closes the current file.
149
        """
150
        if self._file:
151
            self._file.close()
152
153
    # ------------------------------------------------------------------------------------------------------------------
154
    def _get_sample(self, mode: str, encoding: Optional[str]) -> None:
155
        """
156
        Get a sample from the next current input file.
157
158
        :param str mode: The mode for opening the file.
159
        :param str|None encoding: The encoding of the file. None for open the file in binary mode.
160
        """
161
        self._open_file(mode, encoding)
162
        self._sample = self._file.read(UniversalCsvReader.sample_size)
163
        self._file.close()
164
165
    # ------------------------------------------------------------------------------------------------------------------
166
    def _detect_encoding(self) -> None:
167
        """
168
        Detects the encoding og the current file.
169
        """
170
        self._formatting_parameters['encoding'] = chardet.detect(self._sample)['encoding']
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    def _detect_delimiter(self) -> None:
174
        """
175
        Detects the field delimiter in the sample data.
176
        """
177
        candidate_value = ','
178
        candidate_count = 0
179
        for delimiter in UniversalCsvReader.delimiters:
180
            count = self._sample.count(delimiter)
181
            if count > candidate_count:
182
                candidate_value = delimiter
183
                candidate_count = count
184
185
        self._formatting_parameters['delimiter'] = candidate_value
186
187
    # ------------------------------------------------------------------------------------------------------------------
188
    def _detect_line_ending(self) -> None:
189
        """
190
        Detects the line ending in the sample data.
191
        """
192
        candidate_value = '\n'
193
        candidate_count = 0
194
        for line_ending in UniversalCsvReader.line_endings:
195
            count = self._sample.count(line_ending)
196
            if count > candidate_count:
197
                candidate_value = line_ending
198
                candidate_count = count
199
200
        self._formatting_parameters['line_terminator'] = candidate_value
201
202
    # ------------------------------------------------------------------------------------------------------------------
203
    def _open(self) -> None:
204
        """
205
        Opens the next current file with proper settings for encoding and delimiter.
206
        """
207
        self._sample = None
208
209
        formatting_parameters0 = {
210
                'encoding':        'auto',
211
                'delimiter':       'auto',
212
                'line_terminator': 'auto',
213
                'escape_char':     '\\',
214
                'quote_char':      '"'}
215
        formatting_parameters1 = self._helper.pass1(self._filename, formatting_parameters0)
216
        self._formatting_parameters = formatting_parameters1
217
218
        # Detect encoding.
219
        if formatting_parameters1['encoding'] == 'auto':
220
            self._get_sample('rb', None)
221
            self._detect_encoding()
222
223
        # Detect delimiter.
224
        if formatting_parameters1['delimiter'] == 'auto':
225
            self._get_sample('rt', formatting_parameters1['encoding'])
226
            self._detect_delimiter()
227
228
        # Detect line terminators.
229
        if formatting_parameters1['line_terminator'] == 'auto':
230
            if not self._sample:
231
                self._get_sample('rt', formatting_parameters1['encoding'])
232
            self._detect_line_ending()
233
234
        self._formatting_parameters = self._helper.pass2(self._filename,
235
                                                         self._formatting_parameters,
236
                                                         formatting_parameters1)
237
238
        self._open_file('rt', formatting_parameters1['encoding'])
239
        self._csv_reader = csv.reader(self._file,
240
                                      delimiter=self._formatting_parameters['delimiter'],
241
                                      escapechar=self._formatting_parameters['escape_char'],
242
                                      lineterminator=self._formatting_parameters['line_terminator'],
243
                                      quotechar=self._formatting_parameters['quote_char'])  # Ignored
244
245
        self._sample = None
246
247
# ----------------------------------------------------------------------------------------------------------------------
248