Completed
Pull Request — master (#905)
by
unknown
04:05
created

setup()   D

Complexity

Conditions 8

Size

Total Lines 111

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 111
rs 4

6 Methods

Rating   Name   Duplication   Size   Complexity  
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_next_announce() 0 5 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_previous_announce() 0 5 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_previous_busday_offset() 0 6 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_next_busday_offset() 0 6 1
A tests.pipeline.EarningsCalendarLoaderTestCase.zip_with_dates() 0 2 1
A tests.pipeline.EarningsCalendarLoaderTestCase.num_days_between() 0 2 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Tests for the reference loader for EarningsCalendar.
3
"""
4
from unittest import TestCase
5
6
import blaze as bz
7
from contextlib2 import ExitStack
8
from nose_parameterized import parameterized
9
import pandas as pd
10
import numpy as np
11
from pandas.util.testing import assert_series_equal
12
from six import iteritems
13
14
from zipline.pipeline import Pipeline
15
from zipline.pipeline.data import EarningsCalendar
16
from zipline.pipeline.engine import SimplePipelineEngine
17
from zipline.pipeline.factors.events import (
18
    BusinessDaysUntilNextEarnings,
19
    BusinessDaysSincePreviousEarnings,
20
)
21
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
22
from zipline.pipeline.loaders.blaze import (
23
    ANCMT_FIELD_NAME,
24
    BlazeEarningsCalendarLoader,
25
    SID_FIELD_NAME,
26
    TS_FIELD_NAME,
27
)
28
from zipline.utils.numpy_utils import make_datetime64D, np_NaT
29
from zipline.utils.tradingcalendar import trading_days
30
from zipline.utils.test_utils import (
31
    make_simple_equity_info,
32
    powerset,
33
    tmp_asset_finder,
34
)
35
36
37
def _to_series(knowledge_dates, earning_dates):
38
    """
39
    Helper for converting a dict of strings to a Series of datetimes.
40
41
    This is just for making the test cases more readable.
42
    """
43
    return pd.Series(
44
        index=pd.to_datetime(knowledge_dates),
45
        data=pd.to_datetime(earning_dates),
46
    )
47
48
49
def num_days_in_range(dates, start, end):
50
    """
51
    Return the number of days in `dates` between start and end, inclusive.
52
    """
53
    start_idx, stop_idx = dates.slice_locs(start, end)
54
    return stop_idx - start_idx
55
56
57
def gen_calendars():
58
    """
59
    Generate calendars to use as inputs to test_compute_latest.
60
    """
61
    start, stop = '2014-01-01', '2014-01-31'
62
    all_dates = pd.date_range(start, stop, tz='utc')
63
64
    # These dates are the points where announcements or knowledge dates happen.
65
    # Test every combination of them being absent.
66
    critical_dates = pd.to_datetime([
67
        '2014-01-05',
68
        '2014-01-10',
69
        '2014-01-15',
70
        '2014-01-20',
71
    ])
72
    for to_drop in map(list, powerset(critical_dates)):
73
        # Have to yield tuples.
74
        yield (all_dates.drop(to_drop),)
75
76
    # Also test with the trading calendar.
77
    yield (trading_days[trading_days.slice_indexer(start, stop)],)
78
79
80
class EarningsCalendarLoaderTestCase(TestCase):
81
    """
82
    Tests for loading the earnings announcement data.
83
    """
84
    loader_type = EarningsCalendarLoader
85
86
    @classmethod
87
    def setUpClass(cls):
88
        cls._cleanup_stack = stack = ExitStack()
89
        cls.sids = A, B, C, D, E = range(5)
90
        equity_info = make_simple_equity_info(
91
            cls.sids,
92
            start_date=pd.Timestamp('2013-01-01', tz='UTC'),
93
            end_date=pd.Timestamp('2015-01-01', tz='UTC'),
94
        )
95
        cls.finder = stack.enter_context(
96
            tmp_asset_finder(equities=equity_info),
97
        )
98
99
        cls.earnings_dates = {
100
            # K1--K2--E1--E2.
101
            A: _to_series(
102
                knowledge_dates=['2014-01-05', '2014-01-10'],
103
                earning_dates=['2014-01-15', '2014-01-20'],
104
            ),
105
            # K1--K2--E2--E1.
106
            B: _to_series(
107
                knowledge_dates=['2014-01-05', '2014-01-10'],
108
                earning_dates=['2014-01-20', '2014-01-15']
109
            ),
110
            # K1--E1--K2--E2.
111
            C: _to_series(
112
                knowledge_dates=['2014-01-05', '2014-01-15'],
113
                earning_dates=['2014-01-10', '2014-01-20']
114
            ),
115
            # K1 == K2.
116
            D: _to_series(
117
                knowledge_dates=['2014-01-05'] * 2,
118
                earning_dates=['2014-01-10', '2014-01-15'],
119
            ),
120
            E: pd.Series(
121
                data=[],
122
                index=pd.DatetimeIndex([]),
123
                dtype='datetime64[ns]',
124
            ),
125
        }
126
127
    @classmethod
128
    def tearDownClass(cls):
129
        cls._cleanup_stack.close()
130
131
    def loader_args(self, dates):
132
        """Construct the base earnings announcements object to pass to the
133
        loader.
134
135
        Parameters
136
        ----------
137
        dates : pd.DatetimeIndex
138
            The dates we can serve.
139
140
        Returns
141
        -------
142
        args : tuple[any]
143
            The arguments to forward to the loader positionally.
144
        """
145
        return dates, self.earnings_dates
146
147
    def setup(self, dates):
148
        """
149
        Make a PipelineEngine and expectation functions for the given dates
150
        calendar.
151
152
        This exists to make it easy to test our various cases with critical
153
        dates missing from the calendar.
154
        """
155
        A, B, C, D, E = self.sids
156
157
        def num_days_between(start_date, end_date):
158
            return num_days_in_range(dates, start_date, end_date)
159
160
        def zip_with_dates(dts):
161
            return pd.Series(pd.to_datetime(dts), index=dates)
162
163
        _expected_next_announce = pd.DataFrame({
164
            A: zip_with_dates(
165
                ['NaT'] * num_days_between(None, '2014-01-04') +
166
                ['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') +
167
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
168
                ['NaT'] * num_days_between('2014-01-21', None)
169
            ),
170
            B: zip_with_dates(
171
                ['NaT'] * num_days_between(None, '2014-01-04') +
172
                ['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') +
173
                ['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') +
174
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
175
                ['NaT'] * num_days_between('2014-01-21', None)
176
            ),
177
            C: zip_with_dates(
178
                ['NaT'] * num_days_between(None, '2014-01-04') +
179
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
180
                ['NaT'] * num_days_between('2014-01-11', '2014-01-14') +
181
                ['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') +
182
                ['NaT'] * num_days_between('2014-01-21', None)
183
            ),
184
            D: zip_with_dates(
185
                ['NaT'] * num_days_between(None, '2014-01-04') +
186
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
187
                ['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') +
188
                ['NaT'] * num_days_between('2014-01-16', None)
189
            ),
190
            E: zip_with_dates(['NaT'] * len(dates)),
191
        }, index=dates)
192
193
        _expected_previous_announce = pd.DataFrame({
194
            A: zip_with_dates(
195
                ['NaT'] * num_days_between(None, '2014-01-14') +
196
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
197
                ['2014-01-20'] * num_days_between('2014-01-20', None)
198
            ),
199
            B: zip_with_dates(
200
                ['NaT'] * num_days_between(None, '2014-01-14') +
201
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
202
                ['2014-01-20'] * num_days_between('2014-01-20', None)
203
            ),
204
            C: zip_with_dates(
205
                ['NaT'] * num_days_between(None, '2014-01-09') +
206
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') +
207
                ['2014-01-20'] * num_days_between('2014-01-20', None)
208
            ),
209
            D: zip_with_dates(
210
                ['NaT'] * num_days_between(None, '2014-01-09') +
211
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') +
212
                ['2014-01-15'] * num_days_between('2014-01-15', None)
213
            ),
214
            E: zip_with_dates(['NaT'] * len(dates)),
215
        }, index=dates)
216
217
        _expected_next_busday_offsets = self._compute_busday_offsets(
218
            _expected_next_announce
219
        )
220
        _expected_previous_busday_offsets = self._compute_busday_offsets(
221
            _expected_previous_announce
222
        )
223
224
        def expected_next_announce(sid):
225
            """
226
            Return the expected next announcement dates for ``sid``.
227
            """
228
            return _expected_next_announce[sid]
229
230
        def expected_next_busday_offset(sid):
231
            """
232
            Return the expected number of days to the next announcement for
233
            ``sid``.
234
            """
235
            return _expected_next_busday_offsets[sid]
236
237
        def expected_previous_announce(sid):
238
            """
239
            Return the expected previous announcement dates for ``sid``.
240
            """
241
            return _expected_previous_announce[sid]
242
243
        def expected_previous_busday_offset(sid):
244
            """
245
            Return the expected number of days to the next announcement for
246
            ``sid``.
247
            """
248
            return _expected_previous_busday_offsets[sid]
249
250
        loader = self.loader_type(*self.loader_args(dates))
251
        engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
252
        return (
253
            engine,
254
            expected_next_announce,
255
            expected_next_busday_offset,
256
            expected_previous_announce,
257
            expected_previous_busday_offset,
258
        )
259
260
    @staticmethod
261
    def _compute_busday_offsets(announcement_dates):
262
        """
263
        Compute expected business day offsets from a DataFrame of announcement
264
        dates.
265
        """
266
        # Column-vector of dates on which factor `compute` will be called.
267
        raw_call_dates = announcement_dates.index.values.astype(
268
            'datetime64[D]'
269
        )[:, None]
270
271
        # 2D array of dates containining expected nexg announcement.
272
        raw_announce_dates = (
273
            announcement_dates.values.astype('datetime64[D]')
274
        )
275
276
        # Set NaTs to 0 temporarily because busday_count doesn't support NaT.
277
        # We fill these entries with NaNs later.
278
        whereNaT = raw_announce_dates == np_NaT
279
        raw_announce_dates[whereNaT] = make_datetime64D(0)
280
281
        # The abs call here makes it so that we can use this function to
282
        # compute offsets for both next and previous earnings (previous
283
        # earnings offsets come back negative).
284
        expected = abs(np.busday_count(
285
            raw_call_dates,
286
            raw_announce_dates
287
        ).astype(float))
288
289
        expected[whereNaT] = np.nan
290
        return pd.DataFrame(
291
            data=expected,
292
            columns=announcement_dates.columns,
293
            index=announcement_dates.index,
294
        )
295
296
    @parameterized.expand(gen_calendars())
297
    def test_compute_earnings(self, dates):
298
299
        (
300
            engine,
301
            expected_next,
302
            expected_next_busday_offset,
303
            expected_previous,
304
            expected_previous_busday_offset,
305
        ) = self.setup(dates)
306
307
        pipe = Pipeline(
308
            columns={
309
                'next': EarningsCalendar.next_announcement.latest,
310
                'previous': EarningsCalendar.previous_announcement.latest,
311
                'days_to_next': BusinessDaysUntilNextEarnings(),
312
                'days_since_prev': BusinessDaysSincePreviousEarnings(),
313
            }
314
        )
315
316
        result = engine.run_pipeline(
317
            pipe,
318
            start_date=dates[0],
319
            end_date=dates[-1],
320
        )
321
322
        computed_next = result['next']
323
        computed_previous = result['previous']
324
        computed_next_busday_offset = result['days_to_next']
325
        computed_previous_busday_offset = result['days_since_prev']
326
327
        # NaTs in next/prev should correspond to NaNs in offsets.
328
        assert_series_equal(
329
            computed_next.isnull(),
330
            computed_next_busday_offset.isnull(),
331
        )
332
        assert_series_equal(
333
            computed_previous.isnull(),
334
            computed_previous_busday_offset.isnull(),
335
        )
336
337
        for sid in self.sids:
338
339
            assert_series_equal(
340
                computed_next.xs(sid, level=1),
341
                expected_next(sid),
342
                sid,
343
            )
344
345
            assert_series_equal(
346
                computed_previous.xs(sid, level=1),
347
                expected_previous(sid),
348
                sid,
349
            )
350
351
            assert_series_equal(
352
                computed_next_busday_offset.xs(sid, level=1),
353
                expected_next_busday_offset(sid),
354
                sid,
355
            )
356
357
            assert_series_equal(
358
                computed_previous_busday_offset.xs(sid, level=1),
359
                expected_previous_busday_offset(sid),
360
                sid,
361
            )
362
363
364
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
365
    loader_type = BlazeEarningsCalendarLoader
366
367
    def loader_args(self, dates):
368
        _, mapping = super(
369
            BlazeEarningsCalendarLoaderTestCase,
370
            self,
371
        ).loader_args(dates)
372
        return (bz.Data(pd.concat(
373
            pd.DataFrame({
374
                ANCMT_FIELD_NAME: earning_dates,
375
                TS_FIELD_NAME: earning_dates.index,
376
                SID_FIELD_NAME: sid,
377
            })
378
            for sid, earning_dates in iteritems(mapping)
379
        ).reset_index(drop=True)),)
380
381
382
class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
383
        BlazeEarningsCalendarLoaderTestCase):
384
    """Test case for passing a non-interactive symbol and a dict of resources.
385
    """
386
    def loader_args(self, dates):
387
        (bound_expr,) = super(
388
            BlazeEarningsCalendarLoaderNotInteractiveTestCase,
389
            self,
390
        ).loader_args(dates)
391
        return (
392
            bz.symbol('expr', bound_expr.dshape),
393
            bound_expr._resources()[bound_expr],
394
        )
395