Completed
Push — master ( ebb4fb...323695 )
by
unknown
01:25
created

test_masked_rankdata_2d()   B

Complexity

Conditions 3

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 35
rs 8.8571
1
"""
2
Tests for Factor terms.
3
"""
4
from itertools import product
5
from nose_parameterized import parameterized
6
7
from numpy import (
8
    arange,
9
    array,
10
    datetime64,
11
    empty,
12
    eye,
13
    nan,
14
    ones,
15
)
16
from numpy.random import randn, seed
17
18
from zipline.errors import UnknownRankMethod
19
from zipline.lib.rank import masked_rankdata_2d
20
from zipline.pipeline import Factor, Filter, TermGraph
21
from zipline.pipeline.factors import (
22
    Returns,
23
    RSI,
24
)
25
from zipline.utils.test_utils import check_allclose, check_arrays
26
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype, np_NaT
27
28
from .base import BasePipelineTestCase
29
30
31
class F(Factor):
32
    dtype = float64_dtype
33
    inputs = ()
34
    window_length = 0
35
36
37
class Mask(Filter):
38
    inputs = ()
39
    window_length = 0
40
41
42
for_each_factor_dtype = parameterized.expand([
43
    ('datetime64[ns]', datetime64ns_dtype),
44
    ('float', float64_dtype),
45
])
46
47
48
class FactorTestCase(BasePipelineTestCase):
49
50
    def setUp(self):
51
        super(FactorTestCase, self).setUp()
52
        self.f = F()
53
54
    def test_bad_input(self):
55
        with self.assertRaises(UnknownRankMethod):
56
            self.f.rank("not a real rank method")
57
58
    @for_each_factor_dtype
59
    def test_rank_ascending(self, name, factor_dtype):
60
61
        f = F(dtype=factor_dtype)
62
63
        # Generated with:
64
        # data = arange(25).reshape(5, 5).transpose() % 4
65
        data = array([[0, 1, 2, 3, 0],
66
                      [1, 2, 3, 0, 1],
67
                      [2, 3, 0, 1, 2],
68
                      [3, 0, 1, 2, 3],
69
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
70
71
        expected_ranks = {
72
            'ordinal': array([[1., 3., 4., 5., 2.],
73
                              [2., 4., 5., 1., 3.],
74
                              [3., 5., 1., 2., 4.],
75
                              [4., 1., 2., 3., 5.],
76
                              [1., 3., 4., 5., 2.]]),
77
            'average': array([[1.5, 3., 4., 5., 1.5],
78
                              [2.5, 4., 5., 1., 2.5],
79
                              [3.5, 5., 1., 2., 3.5],
80
                              [4.5, 1., 2., 3., 4.5],
81
                              [1.5, 3., 4., 5., 1.5]]),
82
            'min': array([[1., 3., 4., 5., 1.],
83
                          [2., 4., 5., 1., 2.],
84
                          [3., 5., 1., 2., 3.],
85
                          [4., 1., 2., 3., 4.],
86
                          [1., 3., 4., 5., 1.]]),
87
            'max': array([[2., 3., 4., 5., 2.],
88
                          [3., 4., 5., 1., 3.],
89
                          [4., 5., 1., 2., 4.],
90
                          [5., 1., 2., 3., 5.],
91
                          [2., 3., 4., 5., 2.]]),
92
            'dense': array([[1., 2., 3., 4., 1.],
93
                            [2., 3., 4., 1., 2.],
94
                            [3., 4., 1., 2., 3.],
95
                            [4., 1., 2., 3., 4.],
96
                            [1., 2., 3., 4., 1.]]),
97
        }
98
99
        def check(terms):
100
            graph = TermGraph(terms)
101
            results = self.run_graph(
102
                graph,
103
                initial_workspace={f: data},
104
                mask=self.build_mask(ones((5, 5))),
105
            )
106
            for method in terms:
107
                check_arrays(results[method], expected_ranks[method])
108
109
        check({meth: f.rank(method=meth) for meth in expected_ranks})
110
        check({
111
            meth: f.rank(method=meth, ascending=True)
112
            for meth in expected_ranks
113
        })
114
        # Not passing a method should default to ordinal.
115
        check({'ordinal': f.rank()})
116
        check({'ordinal': f.rank(ascending=True)})
117
118
    @for_each_factor_dtype
119
    def test_rank_descending(self, name, factor_dtype):
120
121
        f = F(dtype=factor_dtype)
122
123
        # Generated with:
124
        # data = arange(25).reshape(5, 5).transpose() % 4
125
        data = array([[0, 1, 2, 3, 0],
126
                      [1, 2, 3, 0, 1],
127
                      [2, 3, 0, 1, 2],
128
                      [3, 0, 1, 2, 3],
129
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
130
        expected_ranks = {
131
            'ordinal': array([[4., 3., 2., 1., 5.],
132
                              [3., 2., 1., 5., 4.],
133
                              [2., 1., 5., 4., 3.],
134
                              [1., 5., 4., 3., 2.],
135
                              [4., 3., 2., 1., 5.]]),
136
            'average': array([[4.5, 3., 2., 1., 4.5],
137
                              [3.5, 2., 1., 5., 3.5],
138
                              [2.5, 1., 5., 4., 2.5],
139
                              [1.5, 5., 4., 3., 1.5],
140
                              [4.5, 3., 2., 1., 4.5]]),
141
            'min': array([[4., 3., 2., 1., 4.],
142
                          [3., 2., 1., 5., 3.],
143
                          [2., 1., 5., 4., 2.],
144
                          [1., 5., 4., 3., 1.],
145
                          [4., 3., 2., 1., 4.]]),
146
            'max': array([[5., 3., 2., 1., 5.],
147
                          [4., 2., 1., 5., 4.],
148
                          [3., 1., 5., 4., 3.],
149
                          [2., 5., 4., 3., 2.],
150
                          [5., 3., 2., 1., 5.]]),
151
            'dense': array([[4., 3., 2., 1., 4.],
152
                            [3., 2., 1., 4., 3.],
153
                            [2., 1., 4., 3., 2.],
154
                            [1., 4., 3., 2., 1.],
155
                            [4., 3., 2., 1., 4.]]),
156
        }
157
158
        def check(terms):
159
            graph = TermGraph(terms)
160
            results = self.run_graph(
161
                graph,
162
                initial_workspace={f: data},
163
                mask=self.build_mask(ones((5, 5))),
164
            )
165
            for method in terms:
166
                check_arrays(results[method], expected_ranks[method])
167
168
        check({
169
            meth: f.rank(method=meth, ascending=False)
170
            for meth in expected_ranks
171
        })
172
        # Not passing a method should default to ordinal.
173
        check({'ordinal': f.rank(ascending=False)})
174
175
    @for_each_factor_dtype
176
    def test_rank_after_mask(self, name, factor_dtype):
177
178
        f = F(dtype=factor_dtype)
179
        # data = arange(25).reshape(5, 5).transpose() % 4
180
        data = array([[0, 1, 2, 3, 0],
181
                      [1, 2, 3, 0, 1],
182
                      [2, 3, 0, 1, 2],
183
                      [3, 0, 1, 2, 3],
184
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
185
        mask_data = ~eye(5, dtype=bool)
186
        initial_workspace = {f: data, Mask(): mask_data}
187
188
        graph = TermGraph(
189
            {
190
                "ascending_nomask": f.rank(ascending=True),
191
                "ascending_mask": f.rank(ascending=True, mask=Mask()),
192
                "descending_nomask": f.rank(ascending=False),
193
                "descending_mask": f.rank(ascending=False, mask=Mask()),
194
            }
195
        )
196
197
        expected = {
198
            "ascending_nomask": array([[1., 3., 4., 5., 2.],
199
                                       [2., 4., 5., 1., 3.],
200
                                       [3., 5., 1., 2., 4.],
201
                                       [4., 1., 2., 3., 5.],
202
                                       [1., 3., 4., 5., 2.]]),
203
            "descending_nomask": array([[4., 3., 2., 1., 5.],
204
                                        [3., 2., 1., 5., 4.],
205
                                        [2., 1., 5., 4., 3.],
206
                                        [1., 5., 4., 3., 2.],
207
                                        [4., 3., 2., 1., 5.]]),
208
            # Diagonal should be all nans, and anything whose rank was less
209
            # than the diagonal in the unmasked calc should go down by 1.
210
            "ascending_mask": array([[nan, 2., 3., 4., 1.],
211
                                     [2., nan, 4., 1., 3.],
212
                                     [2., 4., nan, 1., 3.],
213
                                     [3., 1., 2., nan, 4.],
214
                                     [1., 2., 3., 4., nan]]),
215
            "descending_mask": array([[nan, 3., 2., 1., 4.],
216
                                      [2., nan, 1., 4., 3.],
217
                                      [2., 1., nan, 4., 3.],
218
                                      [1., 4., 3., nan, 2.],
219
                                      [4., 3., 2., 1., nan]]),
220
        }
221
222
        results = self.run_graph(
223
            graph,
224
            initial_workspace,
225
            mask=self.build_mask(ones((5, 5))),
226
        )
227
        for method in results:
228
            check_arrays(expected[method], results[method])
229
230
    @parameterized.expand([
231
        # Test cases computed by doing:
232
        # from numpy.random import seed, randn
233
        # from talib import RSI
234
        # seed(seed_value)
235
        # data = abs(randn(15, 3))
236
        # expected = [RSI(data[:, i])[-1] for i in range(3)]
237
        (100, array([41.032913785966, 51.553585468393, 51.022005016446])),
238
        (101, array([43.506969935466, 46.145367530182, 50.57407044197])),
239
        (102, array([46.610102205934, 47.646892444315, 52.13182788538])),
240
    ])
241
    def test_rsi(self, seed_value, expected):
242
243
        rsi = RSI()
244
245
        today = datetime64(1, 'ns')
246
        assets = arange(3)
247
        out = empty((3,), dtype=float)
248
249
        seed(seed_value)  # Seed so we get deterministic results.
250
        test_data = abs(randn(15, 3))
251
252
        out = empty((3,), dtype=float)
253
        rsi.compute(today, assets, out, test_data)
254
255
        check_allclose(expected, out)
256
257
    @parameterized.expand([
258
        (100, 15),
259
        (101, 4),
260
        (102, 100),
261
        ])
262
    def test_returns(self, seed_value, window_length):
263
264
        returns = Returns(window_length=window_length)
265
266
        today = datetime64(1, 'ns')
267
        assets = arange(3)
268
        out = empty((3,), dtype=float)
269
270
        seed(seed_value)  # Seed so we get deterministic results.
271
        test_data = abs(randn(window_length, 3))
272
273
        # Calculate the expected returns
274
        expected = (test_data[-1] - test_data[0]) / test_data[0]
275
276
        out = empty((3,), dtype=float)
277
        returns.compute(today, assets, out, test_data)
278
279
        check_allclose(expected, out)
280
281
    def gen_ranking_cases():
282
        seeds = range(int(1e4), int(1e5), int(1e4))
283
        methods = ('ordinal', 'average')
284
        use_mask_values = (True, False)
285
        set_missing_values = (True, False)
286
        ascending_values = (True, False)
287
        return product(
288
            seeds,
289
            methods,
290
            use_mask_values,
291
            set_missing_values,
292
            ascending_values,
293
        )
294
295
    @parameterized.expand(gen_ranking_cases())
296
    def test_masked_rankdata_2d(self,
297
                                seed_value,
298
                                method,
299
                                use_mask,
300
                                set_missing,
301
                                ascending):
302
        eyemask = ~eye(5, dtype=bool)
303
        nomask = ones((5, 5), dtype=bool)
304
305
        seed(seed_value)
306
        asfloat = (randn(5, 5) * seed_value)
307
        asdatetime = (asfloat).copy().view('datetime64[ns]')
308
309
        mask = eyemask if use_mask else nomask
310
        if set_missing:
311
            asfloat[:, 2] = nan
312
            asdatetime[:, 2] = np_NaT
313
314
        float_result = masked_rankdata_2d(
315
            data=asfloat,
316
            mask=mask,
317
            missing_value=nan,
318
            method=method,
319
            ascending=True,
320
        )
321
        datetime_result = masked_rankdata_2d(
322
            data=asdatetime,
323
            mask=mask,
324
            missing_value=np_NaT,
325
            method=method,
326
            ascending=True,
327
        )
328
329
        check_arrays(float_result, datetime_result)
330