Completed
Push — master ( c15941...e0da72 )
by Philip
01:29
created

TestConcatenateStreams.test_concatenate_streams()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
1
import csv
2
import io
3
import os
4
import unittest
5
import textwrap
6
import zipfile
7
from collections import namedtuple
8
from datetime import date, datetime
9
from tempfile import NamedTemporaryFile
10
11
from foil.fileio import (concatenate_streams, DelimitedReader,
12
                         DelimitedSubsetReader, ZipReader)
13
14
15
sample_path = os.path.join(os.path.dirname(__file__), 'sample_data')
16
17
18
class MockDialect(csv.Dialect):
19
    delimiter = '|'
20
    quotechar = '"'
21
    doublequote = True
22
    skipinitialspace = False
23
    lineterminator = '\n'
24
    quoting = csv.QUOTE_MINIMAL
25
26
27
def delimited_text():
28
    file_content = textwrap.dedent(r"""
29
"NAME"|"CLASS"|"DATE"|"ASSIGNMENT"|"SCORE"|"AVERAGE"
30
"Dave"|"American History"|2015-03-02|"QUIZ 1"|82|82.0
31
"Dave"|"American History"|2015-04-04|"QUIZ 2"|91|86.5
32
"Dave"|"American History"|2015-04-20|"Mid-term"|77|83.333
33
""").strip()
34
    return file_content
35
36
37
def data_records(fields):
38
    Record = namedtuple('Record', fields)
39
    records = [
40
        Record('Dave', 'American History', date(2015, 3, 2), 'QUIZ 1', 82.0, 82.0),
41
        Record('Dave', 'American History', date(2015, 4, 4), 'QUIZ 2', 91.0, 86.5),
42
        Record('Dave', 'American History', date(2015, 4, 20), 'Mid-term', 77.0, 83.333)]
43
44
    return records
45
46
47
def partial_data_records(fields):
48
    Record = namedtuple('Record', fields)
49
    records = [
50
        Record('Dave', 'American History', 82.0),
51
        Record('Dave', 'American History', 86.5),
52
        Record('Dave', 'American History', 83.333)]
53
54
    return records
55
56
57
def single_field_records(fields):
58
    Record = namedtuple('Record', fields)
59
    records = [Record('Dave'), Record('Dave'), Record('Dave')]
60
61
    return records
62
63
64
def parse_date(date_str):
65
    if date_str is '':
66
        return None
67
    else:
68
        return datetime.strptime(date_str, '%Y-%m-%d').date()
69
70
71
class TestDelimitedReader(unittest.TestCase):
72
73
    encoding = 'UTF-8'
74
    zip_filename = 'delimited_file1.txt'
75
76
    @classmethod
77
    def setUpClass(cls):
78
        file_content = delimited_text()
79
        with NamedTemporaryFile(prefix='delim_', suffix='.txt', delete=False) as tmp:
80
            with open(tmp.name, 'w', encoding='UTF-8') as text_file:
81
                text_file.write(file_content)
82
            cls.path = tmp.name
83
84
        file_content_bytes = bytes(file_content, cls.encoding)
85
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
86
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
87
                myzip.writestr(cls.zip_filename, file_content_bytes)
88
            cls.zip_path = tmp.name
89
90
    def setUp(self):
91
        self.maxDiff = None
92
        self.fields = ['NAME', 'CLASS', 'DATE', 'ASSIGNMENT', 'SCORE', 'AVERAGE']
93
        self.converters = [str, str, parse_date, str, float, float]
94
        self.dialect = MockDialect()
95
        self.expected = data_records(self.fields)
96
97
    def test_stream_reader(self):
98
        stream = io.StringIO(delimited_text())
99
        reader = DelimitedReader(stream, dialect=self.dialect,
100
                                 fields=self.fields,
101
                                 converters=self.converters)
102
103
        result = list(reader)
104
105
        self.assertSequenceEqual(self.expected, result)
106
107
    def test_from_file(self):
108
        reader = DelimitedReader.from_file(path=self.path,
109
                                           encoding=self.encoding,
110
                                           dialect=self.dialect,
111
                                           fields=self.fields,
112
                                           converters=self.converters)
113
114
        result = list(reader)
115
116
        self.assertSequenceEqual(self.expected, result)
117
118
    def test_from_zipfile(self):
119
        reader = DelimitedReader.from_zipfile(path=self.zip_path,
120
                                              filename=self.zip_filename,
121
                                              encoding=self.encoding,
122
                                              dialect=self.dialect,
123
                                              fields=self.fields,
124
                                              converters=self.converters)
125
126
        result = list(reader)
127
128
        self.assertSequenceEqual(self.expected, result)
129
130
    def test_line_number(self):
131
        # counts skipped header line
132
        stream = io.StringIO(delimited_text())
133
        reader = DelimitedReader(stream, dialect=self.dialect,
134
                                 fields=self.fields,
135
                                 converters=self.converters)
136
        next(reader)
137
        next(reader)
138
        self.assertEqual(reader.file_line_number, 3)
139
140
    @classmethod
141
    def tearDownClass(cls):
142
        if os.path.exists(cls.path):
143
            os.unlink(cls.path)
144
145
        if os.path.exists(cls.zip_path):
146
            os.unlink(cls.zip_path)
147
148
149
class TestDelimitedSubsetReader(unittest.TestCase):
150
151
    encoding = 'UTF-8'
152
    zip_filename = 'delimited_file1.txt'
153
154
    @classmethod
155
    def setUpClass(cls):
156
        file_content = delimited_text()
157
        with NamedTemporaryFile(prefix='delim_', suffix='.txt', delete=False) as tmp:
158
            with open(tmp.name, 'w', encoding='UTF-8') as text_file:
159
                text_file.write(file_content)
160
            cls.path = tmp.name
161
162
        file_content_bytes = bytes(file_content, cls.encoding)
163
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
164
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
165
                myzip.writestr(cls.zip_filename, file_content_bytes)
166
            cls.zip_path = tmp.name
167
168
    def setUp(self):
169
        self.maxDiff = None
170
        self.headers = ['NAME', 'CLASS', 'DATE', 'ASSIGNMENT', 'SCORE', 'AVERAGE']
171
        self.fields = ['NAME', 'CLASS', 'AVERAGE']
172
        self.field_index = [self.headers.index(field) for field in self.fields]
173
        self.converters = [str, str, float]
174
        self.dialect = MockDialect()
175
        self.expected = partial_data_records(self.fields)
176
177
    def test_stream_reader(self):
178
        stream = io.StringIO(delimited_text())
179
        reader = DelimitedSubsetReader(stream,
180
                                       dialect=self.dialect,
181
                                       fields=self.fields,
182
                                       converters=self.converters,
183
                                       field_index=self.field_index)
184
185
        result = list(reader)
186
187
        self.assertSequenceEqual(self.expected, result)
188
189
    def test_from_file(self):
190
        reader = DelimitedSubsetReader.from_file(path=self.path,
191
                                                 encoding=self.encoding,
192
                                                 dialect=self.dialect,
193
                                                 fields=self.fields,
194
                                                 converters=self.converters,
195
                                                 field_index=self.field_index)
196
197
        result = list(reader)
198
199
        self.assertSequenceEqual(self.expected, result)
200
201
    def test_from_zipfile(self):
202
        reader = DelimitedSubsetReader.from_zipfile(path=self.zip_path,
203
                                                    filename=self.zip_filename,
204
                                                    encoding=self.encoding,
205
                                                    dialect=self.dialect,
206
                                                    fields=self.fields,
207
                                                    converters=self.converters,
208
                                                    field_index=self.field_index)
209
210
        result = list(reader)
211
212
        self.assertSequenceEqual(self.expected, result)
213
214
    def test_single_field(self):
215
        stream = io.StringIO(delimited_text())
216
        field_index = [0]
217
        fields = [self.fields[index] for index in field_index]
218
        converters = [self.converters[index] for index in field_index]
219
220
        reader = DelimitedSubsetReader(stream,
221
                                       dialect=self.dialect,
222
                                       fields=fields,
223
                                       converters=converters,
224
                                       field_index=field_index)
225
226
        expected = single_field_records(fields)
227
        result = list(reader)
228
229
        self.assertEqual(expected, result)
230
231
    @classmethod
232
    def tearDownClass(cls):
233
        if os.path.exists(cls.path):
234
            os.unlink(cls.path)
235
236
        if os.path.exists(cls.zip_path):
237
            os.unlink(cls.zip_path)
238
239
240
class TestZipReader(unittest.TestCase):
241
242
    encoding = 'UTF-8'
243
    filename = 'sample_file.txt'
244
    file_content = 'abc\neasy\n123.\n'
245
    file_content_bytes = bytes(file_content, encoding=encoding)
246
247
    @classmethod
248
    def setUpClass(cls):
249
        with NamedTemporaryFile(prefix='zipped_', suffix='.zip', delete=False) as tmp:
250
            with zipfile.ZipFile(tmp.name, mode='w') as myzip:
251
                    myzip.writestr(cls.filename, cls.file_content_bytes)
252
            cls.path = tmp.name
253
254
    def test_read(self):
255
        result = ZipReader(self.path, self.filename).read(self.encoding)
256
257
        self.assertEqual(self.file_content, result)
258
259
    def test_readlines(self):
260
        line_gen = ZipReader(self.path, self.filename).readlines(self.encoding)
261
        expected = ['abc', 'easy', '123.']
262
263
        self.assertEqual(expected, list(line_gen))
264
265
    @classmethod
266
    def tearDownClass(cls):
267
        if os.path.exists(cls.path):
268
            os.unlink(cls.path)
269
270
271
class TestConcatenateStreams(unittest.TestCase):
272
    def test_concatenate_streams(self):
273
        streams = [[1, 2, 3], ['a', 'b', 'c']]
274
275
        expected = [1, 2, 3, 'a', 'b', 'c']
276
        result = list(concatenate_streams(streams))
277
278
        self.assertEqual(expected, result)
279