Completed
Push — master ( e0da72...49d54f )
by Philip
25s
created

TestTextReader   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 21
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 21
rs 10
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A tearDownClass() 0 4 2
A setUpClass() 0 7 3
A test_text_reader() 0 7 1
1
import csv
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2
import io
3
import os
4
import unittest
5
import textwrap
6
import zipfile
7
from collections import namedtuple
8
from datetime import date, datetime
9
from tempfile import NamedTemporaryFile
10
11
from foil.fileio import (concatenate_streams, DelimitedReader,
12
                         DelimitedSubsetReader, TextReader, ZipReader)
13
14
15
class MockDialect(csv.Dialect):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
16
    delimiter = '|'
17
    quotechar = '"'
18
    doublequote = True
19
    skipinitialspace = False
20
    lineterminator = '\n'
21
    quoting = csv.QUOTE_MINIMAL
22
23
24
def delimited_text():
0 ignored issues
show
Coding Style introduced by
This function should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
25
    file_content = textwrap.dedent(r"""
26
"NAME"|"CLASS"|"DATE"|"ASSIGNMENT"|"SCORE"|"AVERAGE"
27
"Dave"|"American History"|2015-03-02|"QUIZ 1"|82|82.0
28
"Dave"|"American History"|2015-04-04|"QUIZ 2"|91|86.5
29
"Dave"|"American History"|2015-04-20|"Mid-term"|77|83.333
30
""").strip()
31
    return file_content
32
33
34
def data_records(fields):
0 ignored issues
show
Coding Style introduced by
This function should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
35
    Record = namedtuple('Record', fields)
0 ignored issues
show
Coding Style Naming introduced by
The name Record does not conform to the variable naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
36
    records = [
37
        Record('Dave', 'American History', date(2015, 3, 2), 'QUIZ 1', 82.0, 82.0),
38
        Record('Dave', 'American History', date(2015, 4, 4), 'QUIZ 2', 91.0, 86.5),
39
        Record('Dave', 'American History', date(2015, 4, 20), 'Mid-term', 77.0, 83.333)]
40
41
    return records
42
43
44
def partial_data_records(fields):
0 ignored issues
show
Coding Style introduced by
This function should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
45
    Record = namedtuple('Record', fields)
0 ignored issues
show
Coding Style Naming introduced by
The name Record does not conform to the variable naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
46
    records = [
47
        Record('Dave', 'American History', 82.0),
48
        Record('Dave', 'American History', 86.5),
49
        Record('Dave', 'American History', 83.333)]
50
51
    return records
52
53
54
def single_field_records(fields):
55
    Record = namedtuple('Record', fields)
0 ignored issues
show
Coding Style Naming introduced by
The name Record does not conform to the variable naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
56
    records = [Record('Dave'), Record('Dave'), Record('Dave')]
57
58
    return records
59
60
61
def parse_date(date_str):
62
    if date_str is '':
63
        return None
64
    else:
65
        return datetime.strptime(date_str, '%Y-%m-%d').date()
66
67
68
class TestTextReader(unittest.TestCase):
69
    @classmethod
70
    def setUpClass(cls):
71
        file_content = 'hello\nworld\n'
72
        with NamedTemporaryFile(prefix='text_', suffix='.txt', delete=False) as tmp:
73
            with open(tmp.name, 'w', encoding='UTF-8') as text_file:
74
                text_file.write(file_content)
75
            cls.path = tmp.name
76
77
    def test_text_reader(self):
78
        reader = TextReader(self.path, 'UTF-8')
79
80
        expected = ['hello', 'world']
81
        result = list(reader)
82
83
        self.assertEqual(expected, result)
84
85
    @classmethod
86
    def tearDownClass(cls):
87
        if os.path.exists(cls.path):
88
            os.unlink(cls.path)
89
90
91
class TestDelimitedReader(unittest.TestCase):
92
93
    encoding = 'UTF-8'
94
    zip_filename = 'delimited_file1.txt'
95
96 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
97
    def setUpClass(cls):
98
        file_content = delimited_text()
99
        with NamedTemporaryFile(prefix='delim_', suffix='.txt', delete=False) as tmp:
100
            with open(tmp.name, 'w', encoding='UTF-8') as text_file:
101
                text_file.write(file_content)
102
            cls.path = tmp.name
103
104
        file_content_bytes = bytes(file_content, cls.encoding)
105
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
106
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
107
                myzip.writestr(cls.zip_filename, file_content_bytes)
108
            cls.zip_path = tmp.name
109
110
    def setUp(self):
111
        self.maxDiff = None
0 ignored issues
show
Coding Style Naming introduced by
The name maxDiff does not conform to the attribute naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
112
        self.fields = ['NAME', 'CLASS', 'DATE', 'ASSIGNMENT', 'SCORE', 'AVERAGE']
113
        self.converters = [str, str, parse_date, str, float, float]
114
        self.dialect = MockDialect()
115
        self.expected = data_records(self.fields)
116
117
    def test_stream_reader(self):
118
        stream = io.StringIO(delimited_text())
119
        reader = DelimitedReader(stream, dialect=self.dialect,
120
                                 fields=self.fields,
121
                                 converters=self.converters)
122
123
        result = list(reader)
124
125
        self.assertSequenceEqual(self.expected, result)
126
127
    def test_from_file(self):
128
        reader = DelimitedReader.from_file(path=self.path,
129
                                           encoding=self.encoding,
130
                                           dialect=self.dialect,
131
                                           fields=self.fields,
132
                                           converters=self.converters)
133
134
        result = list(reader)
135
136
        self.assertSequenceEqual(self.expected, result)
137
138
    def test_from_zipfile(self):
139
        reader = DelimitedReader.from_zipfile(path=self.zip_path,
140
                                              filename=self.zip_filename,
141
                                              encoding=self.encoding,
142
                                              dialect=self.dialect,
143
                                              fields=self.fields,
144
                                              converters=self.converters)
145
146
        result = list(reader)
147
148
        self.assertSequenceEqual(self.expected, result)
149
150
    def test_line_number(self):
151
        # counts skipped header line
152
        stream = io.StringIO(delimited_text())
153
        reader = DelimitedReader(stream, dialect=self.dialect,
154
                                 fields=self.fields,
155
                                 converters=self.converters)
156
        next(reader)
157
        next(reader)
158
        self.assertEqual(reader.file_line_number, 3)
159
160
    @classmethod
161
    def tearDownClass(cls):
162
        if os.path.exists(cls.path):
163
            os.unlink(cls.path)
164
165
        if os.path.exists(cls.zip_path):
166
            os.unlink(cls.zip_path)
167
168
169
class TestDelimitedSubsetReader(unittest.TestCase):
170
171
    encoding = 'UTF-8'
172
    zip_filename = 'delimited_file1.txt'
173
174 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
175
    def setUpClass(cls):
176
        file_content = delimited_text()
177
        with NamedTemporaryFile(prefix='delim_', suffix='.txt', delete=False) as tmp:
178
            with open(tmp.name, 'w', encoding='UTF-8') as text_file:
179
                text_file.write(file_content)
180
            cls.path = tmp.name
181
182
        file_content_bytes = bytes(file_content, cls.encoding)
183
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
184
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
185
                myzip.writestr(cls.zip_filename, file_content_bytes)
186
            cls.zip_path = tmp.name
187
188
    def setUp(self):
189
        self.maxDiff = None
0 ignored issues
show
Coding Style Naming introduced by
The name maxDiff does not conform to the attribute naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
190
        self.headers = ['NAME', 'CLASS', 'DATE', 'ASSIGNMENT', 'SCORE', 'AVERAGE']
191
        self.fields = ['NAME', 'CLASS', 'AVERAGE']
192
        self.field_index = [self.headers.index(field) for field in self.fields]
193
        self.converters = [str, str, float]
194
        self.dialect = MockDialect()
195
        self.expected = partial_data_records(self.fields)
196
197
    def test_stream_reader(self):
198
        stream = io.StringIO(delimited_text())
199
        reader = DelimitedSubsetReader(stream,
200
                                       dialect=self.dialect,
201
                                       fields=self.fields,
202
                                       converters=self.converters,
203
                                       field_index=self.field_index)
204
205
        result = list(reader)
206
207
        self.assertSequenceEqual(self.expected, result)
208
209
    def test_from_file(self):
210
        reader = DelimitedSubsetReader.from_file(path=self.path,
211
                                                 encoding=self.encoding,
212
                                                 dialect=self.dialect,
213
                                                 fields=self.fields,
214
                                                 converters=self.converters,
215
                                                 field_index=self.field_index)
216
217
        result = list(reader)
218
219
        self.assertSequenceEqual(self.expected, result)
220
221
    def test_from_zipfile(self):
222
        reader = DelimitedSubsetReader.from_zipfile(path=self.zip_path,
223
                                                    filename=self.zip_filename,
224
                                                    encoding=self.encoding,
225
                                                    dialect=self.dialect,
226
                                                    fields=self.fields,
227
                                                    converters=self.converters,
228
                                                    field_index=self.field_index)
229
230
        result = list(reader)
231
232
        self.assertSequenceEqual(self.expected, result)
233
234
    def test_single_field(self):
235
        stream = io.StringIO(delimited_text())
236
        field_index = [0]
237
        fields = [self.fields[index] for index in field_index]
238
        converters = [self.converters[index] for index in field_index]
239
240
        reader = DelimitedSubsetReader(stream,
241
                                       dialect=self.dialect,
242
                                       fields=fields,
243
                                       converters=converters,
244
                                       field_index=field_index)
245
246
        expected = single_field_records(fields)
247
        result = list(reader)
248
249
        self.assertEqual(expected, result)
250
251
    @classmethod
252
    def tearDownClass(cls):
253
        if os.path.exists(cls.path):
254
            os.unlink(cls.path)
255
256
        if os.path.exists(cls.zip_path):
257
            os.unlink(cls.zip_path)
258
259
260
class TestZipReader(unittest.TestCase):
261
262
    encoding = 'UTF-8'
263
    filename = 'sample_file.txt'
264
    file_content = 'abc\neasy\n123.\n'
265
    file_content_bytes = bytes(file_content, encoding=encoding)
266
267
    @classmethod
268
    def setUpClass(cls):
269
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
270
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
271
                    myzip.writestr(cls.filename, cls.file_content_bytes)
0 ignored issues
show
Coding Style introduced by
The indentation here looks off. 16 spaces were expected, but 20 were found.
Loading history...
272
            cls.path = tmp.name
273
274
    def test_read(self):
275
        result = ZipReader(self.path, self.filename).read(self.encoding)
276
277
        self.assertEqual(self.file_content, result)
278
279
    def test_readlines(self):
280
        line_gen = ZipReader(self.path, self.filename).readlines(self.encoding)
281
        expected = ['abc', 'easy', '123.']
282
283
        self.assertEqual(expected, list(line_gen))
284
285
    @classmethod
286
    def tearDownClass(cls):
287
        if os.path.exists(cls.path):
288
            os.unlink(cls.path)
289
290
291
class TestConcatenateStreams(unittest.TestCase):
292
    def test_concatenate_streams(self):
293
        streams = [[1, 2, 3], ['a', 'b', 'c']]
294
295
        expected = [1, 2, 3, 'a', 'b', 'c']
296
        result = list(concatenate_streams(streams))
297
298
        self.assertEqual(expected, result)
299