Completed
Pull Request — master (#905)
by
unknown
01:18
created

test_rank_descending()   A

Complexity

Conditions 4

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 56
rs 9.0544

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Tests for Factor terms.
3
"""
4
from itertools import product
5
from nose_parameterized import parameterized
6
7
from numpy import (
8
    arange,
9
    array,
10
    datetime64,
11
    empty,
12
    eye,
13
    nan,
14
    ones,
15
)
16
from numpy.random import randn, seed
17
18
from zipline.errors import UnknownRankMethod
19
from zipline.lib.rank import masked_rankdata_2d
20
from zipline.pipeline import Factor, Filter, TermGraph
21
from zipline.pipeline.factors import RSI, Returns
22
from zipline.utils.test_utils import check_allclose, check_arrays
23
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype, np_NaT
24
25
from .base import BasePipelineTestCase
26
27
28
class F(Factor):
29
    dtype = float64_dtype
30
    inputs = ()
31
    window_length = 0
32
33
34
class Mask(Filter):
35
    inputs = ()
36
    window_length = 0
37
38
39
for_each_factor_dtype = parameterized.expand([
40
    ('datetime64[ns]', datetime64ns_dtype),
41
    ('float', float64_dtype),
42
])
43
44
45
class FactorTestCase(BasePipelineTestCase):
46
47
    def setUp(self):
48
        super(FactorTestCase, self).setUp()
49
        self.f = F()
50
51
    def test_bad_input(self):
52
        with self.assertRaises(UnknownRankMethod):
53
            self.f.rank("not a real rank method")
54
55
    @for_each_factor_dtype
56
    def test_rank_ascending(self, name, factor_dtype):
57
58
        f = F(dtype=factor_dtype)
59
60
        # Generated with:
61
        # data = arange(25).reshape(5, 5).transpose() % 4
62
        data = array([[0, 1, 2, 3, 0],
63
                      [1, 2, 3, 0, 1],
64
                      [2, 3, 0, 1, 2],
65
                      [3, 0, 1, 2, 3],
66
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
67
68
        expected_ranks = {
69
            'ordinal': array([[1., 3., 4., 5., 2.],
70
                              [2., 4., 5., 1., 3.],
71
                              [3., 5., 1., 2., 4.],
72
                              [4., 1., 2., 3., 5.],
73
                              [1., 3., 4., 5., 2.]]),
74
            'average': array([[1.5, 3., 4., 5., 1.5],
75
                              [2.5, 4., 5., 1., 2.5],
76
                              [3.5, 5., 1., 2., 3.5],
77
                              [4.5, 1., 2., 3., 4.5],
78
                              [1.5, 3., 4., 5., 1.5]]),
79
            'min': array([[1., 3., 4., 5., 1.],
80
                          [2., 4., 5., 1., 2.],
81
                          [3., 5., 1., 2., 3.],
82
                          [4., 1., 2., 3., 4.],
83
                          [1., 3., 4., 5., 1.]]),
84
            'max': array([[2., 3., 4., 5., 2.],
85
                          [3., 4., 5., 1., 3.],
86
                          [4., 5., 1., 2., 4.],
87
                          [5., 1., 2., 3., 5.],
88
                          [2., 3., 4., 5., 2.]]),
89
            'dense': array([[1., 2., 3., 4., 1.],
90
                            [2., 3., 4., 1., 2.],
91
                            [3., 4., 1., 2., 3.],
92
                            [4., 1., 2., 3., 4.],
93
                            [1., 2., 3., 4., 1.]]),
94
        }
95
96
        def check(terms):
97
            graph = TermGraph(terms)
98
            results = self.run_graph(
99
                graph,
100
                initial_workspace={f: data},
101
                mask=self.build_mask(ones((5, 5))),
102
            )
103
            for method in terms:
104
                check_arrays(results[method], expected_ranks[method])
105
106
        check({meth: f.rank(method=meth) for meth in expected_ranks})
107
        check({
108
            meth: f.rank(method=meth, ascending=True)
109
            for meth in expected_ranks
110
        })
111
        # Not passing a method should default to ordinal.
112
        check({'ordinal': f.rank()})
113
        check({'ordinal': f.rank(ascending=True)})
114
115
    @for_each_factor_dtype
116
    def test_rank_descending(self, name, factor_dtype):
117
118
        f = F(dtype=factor_dtype)
119
120
        # Generated with:
121
        # data = arange(25).reshape(5, 5).transpose() % 4
122
        data = array([[0, 1, 2, 3, 0],
123
                      [1, 2, 3, 0, 1],
124
                      [2, 3, 0, 1, 2],
125
                      [3, 0, 1, 2, 3],
126
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
127
        expected_ranks = {
128
            'ordinal': array([[4., 3., 2., 1., 5.],
129
                              [3., 2., 1., 5., 4.],
130
                              [2., 1., 5., 4., 3.],
131
                              [1., 5., 4., 3., 2.],
132
                              [4., 3., 2., 1., 5.]]),
133
            'average': array([[4.5, 3., 2., 1., 4.5],
134
                              [3.5, 2., 1., 5., 3.5],
135
                              [2.5, 1., 5., 4., 2.5],
136
                              [1.5, 5., 4., 3., 1.5],
137
                              [4.5, 3., 2., 1., 4.5]]),
138
            'min': array([[4., 3., 2., 1., 4.],
139
                          [3., 2., 1., 5., 3.],
140
                          [2., 1., 5., 4., 2.],
141
                          [1., 5., 4., 3., 1.],
142
                          [4., 3., 2., 1., 4.]]),
143
            'max': array([[5., 3., 2., 1., 5.],
144
                          [4., 2., 1., 5., 4.],
145
                          [3., 1., 5., 4., 3.],
146
                          [2., 5., 4., 3., 2.],
147
                          [5., 3., 2., 1., 5.]]),
148
            'dense': array([[4., 3., 2., 1., 4.],
149
                            [3., 2., 1., 4., 3.],
150
                            [2., 1., 4., 3., 2.],
151
                            [1., 4., 3., 2., 1.],
152
                            [4., 3., 2., 1., 4.]]),
153
        }
154
155
        def check(terms):
156
            graph = TermGraph(terms)
157
            results = self.run_graph(
158
                graph,
159
                initial_workspace={f: data},
160
                mask=self.build_mask(ones((5, 5))),
161
            )
162
            for method in terms:
163
                check_arrays(results[method], expected_ranks[method])
164
165
        check({
166
            meth: f.rank(method=meth, ascending=False)
167
            for meth in expected_ranks
168
        })
169
        # Not passing a method should default to ordinal.
170
        check({'ordinal': f.rank(ascending=False)})
171
172
    @for_each_factor_dtype
173
    def test_rank_after_mask(self, name, factor_dtype):
174
175
        f = F(dtype=factor_dtype)
176
        # data = arange(25).reshape(5, 5).transpose() % 4
177
        data = array([[0, 1, 2, 3, 0],
178
                      [1, 2, 3, 0, 1],
179
                      [2, 3, 0, 1, 2],
180
                      [3, 0, 1, 2, 3],
181
                      [0, 1, 2, 3, 0]], dtype=factor_dtype)
182
        mask_data = ~eye(5, dtype=bool)
183
        initial_workspace = {f: data, Mask(): mask_data}
184
185
        graph = TermGraph(
186
            {
187
                "ascending_nomask": f.rank(ascending=True),
188
                "ascending_mask": f.rank(ascending=True, mask=Mask()),
189
                "descending_nomask": f.rank(ascending=False),
190
                "descending_mask": f.rank(ascending=False, mask=Mask()),
191
            }
192
        )
193
194
        expected = {
195
            "ascending_nomask": array([[1., 3., 4., 5., 2.],
196
                                       [2., 4., 5., 1., 3.],
197
                                       [3., 5., 1., 2., 4.],
198
                                       [4., 1., 2., 3., 5.],
199
                                       [1., 3., 4., 5., 2.]]),
200
            "descending_nomask": array([[4., 3., 2., 1., 5.],
201
                                        [3., 2., 1., 5., 4.],
202
                                        [2., 1., 5., 4., 3.],
203
                                        [1., 5., 4., 3., 2.],
204
                                        [4., 3., 2., 1., 5.]]),
205
            # Diagonal should be all nans, and anything whose rank was less
206
            # than the diagonal in the unmasked calc should go down by 1.
207
            "ascending_mask": array([[nan, 2., 3., 4., 1.],
208
                                     [2., nan, 4., 1., 3.],
209
                                     [2., 4., nan, 1., 3.],
210
                                     [3., 1., 2., nan, 4.],
211
                                     [1., 2., 3., 4., nan]]),
212
            "descending_mask": array([[nan, 3., 2., 1., 4.],
213
                                      [2., nan, 1., 4., 3.],
214
                                      [2., 1., nan, 4., 3.],
215
                                      [1., 4., 3., nan, 2.],
216
                                      [4., 3., 2., 1., nan]]),
217
        }
218
219
        results = self.run_graph(
220
            graph,
221
            initial_workspace,
222
            mask=self.build_mask(ones((5, 5))),
223
        )
224
        for method in results:
225
            check_arrays(expected[method], results[method])
226
227
    @parameterized.expand([
228
        # Test cases computed by doing:
229
        # from numpy.random import seed, randn
230
        # from talib import RSI
231
        # seed(seed_value)
232
        # data = abs(randn(15, 3))
233
        # expected = [RSI(data[:, i])[-1] for i in range(3)]
234
        (100, array([41.032913785966, 51.553585468393, 51.022005016446])),
235
        (101, array([43.506969935466, 46.145367530182, 50.57407044197])),
236
        (102, array([46.610102205934, 47.646892444315, 52.13182788538])),
237
    ])
238
    def test_rsi(self, seed_value, expected):
239
240
        rsi = RSI()
241
242
        today = datetime64(1, 'ns')
243
        assets = arange(3)
244
        out = empty((3,), dtype=float)
245
246
        seed(seed_value)  # Seed so we get deterministic results.
247
        test_data = abs(randn(15, 3))
248
249
        out = empty((3,), dtype=float)
250
        rsi.compute(today, assets, out, test_data)
251
252
        check_allclose(expected, out)
253
254
    @parameterized.expand([
255
        (100, 15),
256
        (101, 4),
257
        (102, 100),
258
        ])
259
    def test_returns(self, seed_value, window_length):
260
261
        returns = Returns(window_length=window_length)
262
263
        today = datetime64(1, 'ns')
264
        assets = arange(3)
265
        out = empty((3,), dtype=float)
266
267
        seed(seed_value)  # Seed so we get deterministic results.
268
        test_data = abs(randn(window_length, 3))
269
270
        # Calculate the expected returns
271
        expected = (test_data[-1] - test_data[0]) / test_data[0]
272
273
        out = empty((3,), dtype=float)
274
        returns.compute(today, assets, out, test_data)
275
276
        check_allclose(expected, out)
277
278
    def gen_ranking_cases():
279
        seeds = range(int(1e4), int(1e5), int(1e4))
280
        methods = ('ordinal', 'average')
281
        use_mask_values = (True, False)
282
        set_missing_values = (True, False)
283
        ascending_values = (True, False)
284
        return product(
285
            seeds,
286
            methods,
287
            use_mask_values,
288
            set_missing_values,
289
            ascending_values,
290
        )
291
292
    @parameterized.expand(gen_ranking_cases())
293
    def test_masked_rankdata_2d(self,
294
                                seed_value,
295
                                method,
296
                                use_mask,
297
                                set_missing,
298
                                ascending):
299
        eyemask = ~eye(5, dtype=bool)
300
        nomask = ones((5, 5), dtype=bool)
301
302
        seed(seed_value)
303
        asfloat = (randn(5, 5) * seed_value)
304
        asdatetime = (asfloat).copy().view('datetime64[ns]')
305
306
        mask = eyemask if use_mask else nomask
307
        if set_missing:
308
            asfloat[:, 2] = nan
309
            asdatetime[:, 2] = np_NaT
310
311
        float_result = masked_rankdata_2d(
312
            data=asfloat,
313
            mask=mask,
314
            missing_value=nan,
315
            method=method,
316
            ascending=True,
317
        )
318
        datetime_result = masked_rankdata_2d(
319
            data=asdatetime,
320
            mask=mask,
321
            missing_value=np_NaT,
322
            method=method,
323
            ascending=True,
324
        )
325
326
        check_arrays(float_result, datetime_result)
327