Completed
Pull Request — master (#836)
by
unknown
01:28
created

tests.pipeline.AdjustedArrayTestCase   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 109
Duplicated Lines 0 %
Metric Value
dl 0
loc 109
rs 10
wmc 19

7 Methods

Rating   Name   Duplication   Size   Complexity  
A test_multiplicative_adjustments() 0 16 3
A test_array_views_arent_writable() 0 8 3
A test_invalid_lookback() 0 13 4
A test_no_adjustments() 0 16 3
A test_overwrite_adjustment_cases() 0 16 3
B test_inspect() 0 25 1
A test_bad_input() 0 7 2
1
"""
2
Tests for chunked adjustments.
3
"""
4
from textwrap import dedent
5
from unittest import TestCase
6
7
from nose_parameterized import parameterized
8
from numpy import (
9
    arange,
10
    array,
11
    full,
12
)
13
from numpy.testing import assert_array_equal
14
from six.moves import zip_longest
15
16
from zipline.lib.adjustment import (
17
    Float64Multiply,
18
    Float64Overwrite,
19
)
20
from zipline.lib.adjusted_array import (
21
    adjusted_array,
22
    NOMASK,
23
)
24
from zipline.errors import (
25
    WindowLengthNotPositive,
26
    WindowLengthTooLong,
27
)
28
29
30
def num_windows_of_length_M_on_buffers_of_length_N(M, N):
31
    """
32
    For a window of length M rolling over a buffer of length N,
33
    there are (N - M) + 1 legal windows.
34
35
    Example:
36
    If my array has N=4 rows, and I want windows of length M=2, there are
37
    3 legal windows: data[0:2], data[1:3], and data[2:4].
38
    """
39
    return N - M + 1
40
41
42
def valid_window_lengths(underlying_buffer_length):
43
    """
44
    An iterator of all legal window lengths on a buffer of a given length.
45
46
    Returns values from 1 to underlying_buffer_length.
47
    """
48
    return iter(range(1, underlying_buffer_length + 1))
49
50
51
def _gen_unadjusted_cases(dtype):
52
53
    nrows = 6
54
    ncols = 3
55
    data = arange(nrows * ncols, dtype=dtype).reshape(nrows, ncols)
56
57
    for windowlen in valid_window_lengths(nrows):
58
59
        num_legal_windows = num_windows_of_length_M_on_buffers_of_length_N(
60
            windowlen, nrows
61
        )
62
63
        yield (
64
            "length_%d" % windowlen,
65
            data,
66
            windowlen,
67
            {},
68
            [
69
                data[offset:offset + windowlen]
70
                for offset in range(num_legal_windows)
71
            ],
72
        )
73
74
75
def _gen_multiplicative_adjustment_cases(dtype):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
76
    """
77
    Generate expected moving windows on a buffer with adjustments.
78
79
    We proceed by constructing, at each row, the view of the array we expect in
80
    in all windows anchored on or after that row.
81
82
    In general, if we have an adjustment to be applied once we process the row
83
    at index N, should see that adjustment applied to the underlying buffer for
84
    any window containing the row at index N.
85
86
    We then build all legal windows over these buffers.
87
    """
88
    adjustment_type = {
89
        float: Float64Multiply,
90
    }[dtype]
91
92
    nrows, ncols = 6, 3
93
    adjustments = {}
94
    buffer_as_of = [None] * 6
95
    baseline = buffer_as_of[0] = full((nrows, ncols), 1, dtype=dtype)
96
97
    # Note that row indices are inclusive!
98
    adjustments[1] = [
99
        adjustment_type(0, 0, 0, 0, dtype(2)),
100
    ]
101
    buffer_as_of[1] = array([[2, 1, 1],
102
                             [1, 1, 1],
103
                             [1, 1, 1],
104
                             [1, 1, 1],
105
                             [1, 1, 1],
106
                             [1, 1, 1]], dtype=dtype)
107
108
    # No adjustment at index 2.
109
    buffer_as_of[2] = buffer_as_of[1]
110
111
    adjustments[3] = [
112
        adjustment_type(1, 2, 1, 1, dtype(3)),
113
        adjustment_type(0, 1, 0, 0, dtype(4)),
114
    ]
115
    buffer_as_of[3] = array([[8, 1, 1],
116
                             [4, 3, 1],
117
                             [1, 3, 1],
118
                             [1, 1, 1],
119
                             [1, 1, 1],
120
                             [1, 1, 1]], dtype=dtype)
121
122
    adjustments[4] = [
123
        adjustment_type(0, 3, 2, 2, dtype(5))
124
    ]
125
    buffer_as_of[4] = array([[8, 1, 5],
126
                             [4, 3, 5],
127
                             [1, 3, 5],
128
                             [1, 1, 5],
129
                             [1, 1, 1],
130
                             [1, 1, 1]], dtype=dtype)
131
132
    adjustments[5] = [
133
        adjustment_type(0, 4, 1, 1, dtype(6)),
134
        adjustment_type(2, 2, 2, 2, dtype(7)),
135
    ]
136
    buffer_as_of[5] = array([[8,  6,  5],
137
                             [4, 18,  5],
138
                             [1, 18, 35],
139
                             [1,  6,  5],
140
                             [1,  6,  1],
141
                             [1,  1,  1]], dtype=dtype)
142
143
    return _gen_expectations(baseline, adjustments, buffer_as_of, nrows)
144
145
146
def _gen_overwrite_adjustment_cases(dtype):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
147
    """
148
    Generate test cases for overwrite adjustments.
149
150
    The algorithm used here is the same as the one used above for
151
    multiplicative adjustments.  The only difference is the semantics of how
152
    the adjustments are expected to modify the arrays.
153
    """
154
155
    adjustment_type = {
156
        float: Float64Overwrite,
157
    }[dtype]
158
159
    nrows, ncols = 6, 3
160
    adjustments = {}
161
    buffer_as_of = [None] * 6
162
    baseline = buffer_as_of[0] = full((nrows, ncols), 2, dtype=dtype)
163
164
    # Note that row indices are inclusive!
165
    adjustments[1] = [
166
        adjustment_type(0, 0, 0, 0, dtype(1)),
167
    ]
168
    buffer_as_of[1] = array([[1, 2, 2],
169
                             [2, 2, 2],
170
                             [2, 2, 2],
171
                             [2, 2, 2],
172
                             [2, 2, 2],
173
                             [2, 2, 2]], dtype=dtype)
174
175
    # No adjustment at index 2.
176
    buffer_as_of[2] = buffer_as_of[1]
177
178
    adjustments[3] = [
179
        adjustment_type(1, 2, 1, 1, dtype(3)),
180
        adjustment_type(0, 1, 0, 0, dtype(4)),
181
    ]
182
    buffer_as_of[3] = array([[4, 2, 2],
183
                             [4, 3, 2],
184
                             [2, 3, 2],
185
                             [2, 2, 2],
186
                             [2, 2, 2],
187
                             [2, 2, 2]], dtype=dtype)
188
189
    adjustments[4] = [
190
        adjustment_type(0, 3, 2, 2, dtype(5))
191
    ]
192
    buffer_as_of[4] = array([[4, 2, 5],
193
                             [4, 3, 5],
194
                             [2, 3, 5],
195
                             [2, 2, 5],
196
                             [2, 2, 2],
197
                             [2, 2, 2]], dtype=dtype)
198
199
    adjustments[5] = [
200
        adjustment_type(0, 4, 1, 1, dtype(6)),
201
        adjustment_type(2, 2, 2, 2, dtype(7)),
202
    ]
203
    buffer_as_of[5] = array([[4,  6,  5],
204
                             [4,  6,  5],
205
                             [2,  6,  7],
206
                             [2,  6,  5],
207
                             [2,  6,  2],
208
                             [2,  2,  2]], dtype=dtype)
209
210
    return _gen_expectations(
211
        baseline,
212
        adjustments,
213
        buffer_as_of,
214
        nrows,
215
    )
216
217
218
def _gen_expectations(baseline, adjustments, buffer_as_of, nrows):
219
220
    for windowlen in valid_window_lengths(nrows):
221
222
        num_legal_windows = num_windows_of_length_M_on_buffers_of_length_N(
223
            windowlen, nrows
224
        )
225
226
        yield (
227
            "length_%d" % windowlen,
228
            baseline,
229
            windowlen,
230
            adjustments,
231
            [
232
                # This is a nasty expression...
233
                #
234
                # Reading from right to left: we want a slice of length
235
                # 'windowlen', starting at 'offset', from the buffer on which
236
                # we've applied all adjustments corresponding to the last row
237
                # of the data, which will be (offset + windowlen - 1).
238
                buffer_as_of[offset + windowlen - 1][offset:offset + windowlen]
239
                for offset in range(num_legal_windows)
240
            ],
241
        )
242
243
244
class AdjustedArrayTestCase(TestCase):
245
246
    @parameterized.expand(_gen_unadjusted_cases(float))
247
    def test_no_adjustments(self,
248
                            name,
249
                            data,
250
                            lookback,
251
                            adjustments,
252
                            expected):
253
        array = adjusted_array(
254
            data,
255
            NOMASK,
256
            adjustments,
257
        )
258
        for _ in range(2):  # Iterate 2x ensure adjusted_arrays are re-usable.
259
            window_iter = array.traverse(lookback)
260
            for yielded, expected_yield in zip_longest(window_iter, expected):
261
                assert_array_equal(yielded, expected_yield)
262
263
    @parameterized.expand(_gen_multiplicative_adjustment_cases(float))
264
    def test_multiplicative_adjustments(self,
265
                                        name,
266
                                        data,
267
                                        lookback,
268
                                        adjustments,
269
                                        expected):
270
        array = adjusted_array(
271
            data,
272
            NOMASK,
273
            adjustments,
274
        )
275
        for _ in range(2):  # Iterate 2x ensure adjusted_arrays are re-usable.
276
            window_iter = array.traverse(lookback)
277
            for yielded, expected_yield in zip_longest(window_iter, expected):
278
                assert_array_equal(yielded, expected_yield)
279
280
    @parameterized.expand(_gen_overwrite_adjustment_cases(float))
281
    def test_overwrite_adjustment_cases(self,
282
                                        name,
283
                                        data,
284
                                        lookback,
285
                                        adjustments,
286
                                        expected):
287
        array = adjusted_array(
288
            data,
289
            NOMASK,
290
            adjustments,
291
        )
292
        for _ in range(2):  # Iterate 2x ensure adjusted_arrays are re-usable.
293
            window_iter = array.traverse(lookback)
294
            for yielded, expected_yield in zip_longest(window_iter, expected):
295
                assert_array_equal(yielded, expected_yield)
296
297
    def test_invalid_lookback(self):
298
299
        data = arange(30, dtype=float).reshape(6, 5)
300
        adj_array = adjusted_array(data, NOMASK, {})
301
302
        with self.assertRaises(WindowLengthTooLong):
303
            adj_array.traverse(7)
304
305
        with self.assertRaises(WindowLengthNotPositive):
306
            adj_array.traverse(0)
307
308
        with self.assertRaises(WindowLengthNotPositive):
309
            adj_array.traverse(-1)
310
311
    def test_array_views_arent_writable(self):
312
313
        data = arange(30, dtype=float).reshape(6, 5)
314
        adj_array = adjusted_array(data, NOMASK, {})
315
316
        for frame in adj_array.traverse(3):
317
            with self.assertRaises(ValueError):
318
                frame[0, 0] = 5.0
319
320
    def test_bad_input(self):
321
        msg = "Mask shape \(2, 3\) != data shape \(5, 5\)"
322
        data = arange(25).reshape(5, 5)
323
        bad_mask = array([[0, 1, 1], [0, 0, 1]], dtype=bool)
324
325
        with self.assertRaisesRegexp(ValueError, msg):
326
            adjusted_array(data, bad_mask, {})
327
328
    def test_inspect(self):
329
        data = arange(15, dtype=float).reshape(5, 3)
330
        adj_array = adjusted_array(
331
            data,
332
            NOMASK,
333
            {4: [Float64Multiply(2, 3, 0, 0, 4.0)]},
334
        )
335
336
        expected = dedent(
337
            """\
338
            Adjusted Array:
339
340
            Data:
341
            array([[  0.,   1.,   2.],
342
                   [  3.,   4.,   5.],
343
                   [  6.,   7.,   8.],
344
                   [  9.,  10.,  11.],
345
                   [ 12.,  13.,  14.]])
346
347
            Adjustments:
348
            {4: [Float64Multiply(first_row=2, last_row=3, first_col=0, \
349
last_col=0, value=4.000000)]}
350
            """
351
        )
352
        self.assertEqual(expected, adj_array.inspect())
353